]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.history/src/org/simantics/history/impl/CollectorImpl.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.history / src / org / simantics / history / impl / CollectorImpl.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2011 Association for Decentralized Information Management in\r
3  * Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package org.simantics.history.impl;\r
13 \r
14 import gnu.trove.set.hash.THashSet;\r
15 \r
16 import java.io.IOException;\r
17 import java.util.ArrayList;\r
18 import java.util.List;\r
19 import java.util.Set;\r
20 import java.util.UUID;\r
21 import java.util.concurrent.CopyOnWriteArrayList;\r
22 import java.util.logging.Level;\r
23 import java.util.logging.Logger;\r
24 \r
25 import org.simantics.databoard.Bindings;\r
26 import org.simantics.databoard.accessor.StreamAccessor;\r
27 import org.simantics.databoard.accessor.error.AccessorException;\r
28 import org.simantics.databoard.adapter.AdaptException;\r
29 import org.simantics.databoard.adapter.Adapter;\r
30 import org.simantics.databoard.adapter.AdapterConstructionException;\r
31 import org.simantics.databoard.binding.Binding;\r
32 import org.simantics.databoard.binding.DoubleBinding;\r
33 import org.simantics.databoard.binding.FloatBinding;\r
34 import org.simantics.databoard.binding.NumberBinding;\r
35 import org.simantics.databoard.binding.RecordBinding;\r
36 import org.simantics.databoard.binding.error.BindingConstructionException;\r
37 import org.simantics.databoard.binding.error.BindingException;\r
38 import org.simantics.databoard.binding.error.RuntimeBindingException;\r
39 import org.simantics.databoard.binding.impl.DoubleBindingDefault;\r
40 import org.simantics.databoard.binding.mutable.MutableVariant;\r
41 import org.simantics.databoard.binding.mutable.Variant;\r
42 import org.simantics.databoard.binding.reflection.VoidBinding;\r
43 import org.simantics.databoard.serialization.Serializer;\r
44 import org.simantics.databoard.type.Component;\r
45 import org.simantics.databoard.type.Datatype;\r
46 import org.simantics.databoard.type.NumberType;\r
47 import org.simantics.databoard.type.RecordType;\r
48 import org.simantics.databoard.util.Bean;\r
49 import org.simantics.history.Collector;\r
50 import org.simantics.history.HistoryException;\r
51 import org.simantics.history.HistoryManager;\r
52 import org.simantics.history.impl.CollectorState.Item;\r
53 import org.simantics.history.impl.CollectorState.VariableState;\r
54 import org.simantics.history.util.ValueBand;\r
55 import org.simantics.history.util.WeightedMedian;\r
56 import org.simantics.history.util.subscription.SubscriptionItem;\r
57 \r
58 /**\r
59  * Collector is used to record data to streams in history manager.\r
60  * \r
61  * For every item there is collector item state {@link CollectorState.Item}, \r
62  * that contains data required in writing process. This state is read and \r
63  * written to history manager item (bean) as meta-data. \r
64  * \r
65  * Stream files are opened in the first {@link #beginStep(NumberBinding, Object)}. \r
66  * \r
67  * @author toni.kalajainen\r
68  *\r
69  */\r
70 public class CollectorImpl implements Collector {\r
71 \r
72         /** Logger */\r
73         static Logger logger = Logger.getLogger(CollectorImpl.class.getName());\r
74         \r
75         public static final int MEDIAN_LIMIT = 500;\r
76         \r
77         public static final double EPSILON = 1e-2;\r
78         \r
79         /** Collector state */\r
80         CollectorState state;\r
81 \r
82         /** Open streams, corresponds to subscription items */\r
83         CopyOnWriteArrayList<ActiveStream> items = new CopyOnWriteArrayList<ActiveStream>();\r
84         \r
85         /** Associated history state */ \r
86         HistoryManager history;\r
87         \r
88         /** True when all items are open */\r
89         boolean itemsOpen = true;\r
90         \r
91         public FlushPolicy flushPolicy = FlushPolicy.FlushOnEveryStep;\r
92         \r
93         public FlushPolicy getFlushPolicy() {\r
94                 return flushPolicy;\r
95         }\r
96 \r
97         public void setFlushPolicy(FlushPolicy flushPolicy) {\r
98                 this.flushPolicy = flushPolicy;\r
99         }\r
100 \r
101         /**\r
102          * Create a new collector that is associated with a history manager.\r
103          * \r
104          * @param history\r
105          * @throws HistoryException\r
106          */\r
107         public CollectorImpl(HistoryManager history) {\r
108                 this.history = history;\r
109                 this.state = new CollectorState();\r
110                 this.state.init();\r
111                 this.state.id = UUID.randomUUID().toString();\r
112         }\r
113 \r
114         /**\r
115          * Create a new collector that is associated with a history manager.\r
116          * \r
117          * @param history\r
118          * @param previousState\r
119          * @throws HistoryException\r
120          */\r
121         public CollectorImpl(HistoryManager history, CollectorState previousState) {\r
122                 this.history = history;\r
123                 this.state = (CollectorState) previousState.clone();\r
124         }\r
125         \r
126         @Override\r
127         public synchronized void addItem(Bean item) throws HistoryException {\r
128                 try {\r
129                         String id = (String) item.getField("id");\r
130                         \r
131                         // Item State\r
132                         CollectorState.Item itemState = null;\r
133                         if ( item.hasField("collectorState") ) itemState = (CollectorState.Item) item.getField("collectorState", CollectorState.BINDING_ITEM);                  \r
134                         if ( itemState == null ) itemState = state.itemStates.get( id );\r
135                         if ( itemState == null ) {\r
136                                 // Create new item\r
137                                 itemState = new CollectorState.Item();\r
138                                 itemState.init();\r
139                                 itemState.readAvailableFields( item );\r
140                                 // Guess values from the existing data  \r
141                                 if (history.exists(id)) {\r
142                                         readItemState(history, item, itemState);\r
143                                 }\r
144                         }\r
145                         \r
146                         addItem(id, itemState);                 \r
147                 } catch (BindingException e) {\r
148                         throw new HistoryException( e ); \r
149                 }\r
150         }\r
151         \r
152         public synchronized void addItem(String id, CollectorState.Item itemState) throws HistoryException {\r
153                 try {                   \r
154                         // Item State\r
155                         state.itemStates.put(id, itemState);\r
156                         itemsOpen = false;\r
157                         \r
158                         // Time                 \r
159                         if (state.time.type() instanceof NumberType) {                          \r
160                                 double time = (Double) Bindings.adapt(state.time.getValue(), state.time.getBinding(), Bindings.DOUBLE);\r
161                                 if ( !Double.isNaN(itemState.currentTime) ) {\r
162                                         if ( itemState.currentTime > time ) {\r
163                                                 state.time.setValue(Bindings.DOUBLE, itemState.currentTime);\r
164                                         }\r
165                                 }                               \r
166                         }\r
167                 } catch (AdaptException e) {\r
168                         throw new HistoryException( e ); \r
169                 }\r
170         }\r
171         \r
172         public synchronized void addItems(Bean...items) throws HistoryException {\r
173                 for (Bean item : items) addItem(item);\r
174         }\r
175         \r
176         @Override\r
177         public synchronized void removeItem(String id) throws HistoryException {\r
178                 CollectorState.Item itemState = state.itemStates.remove( id );\r
179                 if (itemState!=null && history.exists(id)) {\r
180                         Bean bean = history.getItem(id);\r
181                         bean.readAvailableFields(itemState);\r
182                         try {\r
183                                 bean = setItemState(bean, itemState);\r
184                                 history.modify(bean);\r
185                         } catch (BindingException e) {\r
186                                 // Unexpected\r
187                                 logger.log(Level.FINE, "Failed to update bean", e);                     \r
188                         } catch (BindingConstructionException e) {\r
189                                 // Unexpected\r
190                                 logger.log(Level.FINE, "Failed to update bean", e);\r
191                         } catch (HistoryException e) {\r
192                                 logger.log(Level.FINE, "Failed to update history item meta-data", e);\r
193                         }\r
194                 }\r
195                 \r
196                 ActiveStream as = getStream(id);\r
197                 if ( as != null ) {\r
198                         items.remove( as );\r
199                         as.close();\r
200                 }\r
201         }\r
202         \r
203         ActiveStream getStream(String id) {\r
204                 for (ActiveStream as : items) {\r
205                         if (as.itemState.id.equals(id)) return as;\r
206                 }\r
207                 return null;\r
208         }\r
209         \r
210         /**\r
211          * Set (or Add) item's configuration. \r
212          * \r
213          * @param item item\r
214          * @throws HistoryException\r
215          */\r
216         public synchronized void setItem(Bean item) throws HistoryException {\r
217                 try {\r
218                         String id = (String) item.getField("id");\r
219                         \r
220                         CollectorState.Item itemState = state.itemStates.get( id );\r
221                         if ( itemState==null ) {\r
222                                 addItem(item);\r
223                                 return;\r
224                         }\r
225                         itemState.readAvailableFields(item);\r
226                 } catch (BindingException e) {\r
227                         throw new HistoryException( e ); \r
228                 }\r
229         }\r
230         \r
231         \r
232         /**\r
233          * Set item state to bean. If the bean does not have field collectorState,\r
234          * a new bean class is created, and field added.\r
235          *  \r
236          * @param config\r
237          * @param state\r
238          * @return same or new bean\r
239          * @throws BindingException \r
240          * @throws BindingConstructionException \r
241          */\r
242         public static Bean setItemState(Bean config, CollectorState.Item state) throws BindingException, BindingConstructionException\r
243         {\r
244                 Binding binding = config.getFieldBinding("collectorState");\r
245                 if ( binding == null || !binding.type().equals(CollectorState.BINDING_ITEM.type()) ) {\r
246                         RecordType rt = config.getBinding().type();\r
247                         RecordType newType = new RecordType();\r
248                         for (int i=0; i<rt.getComponentCount(); i++) {\r
249                                 Component c = rt.getComponent(i);\r
250                                 if ( c.name.equals("collectorState") ) continue;\r
251                                 newType.addComponent(c.name, c.type);\r
252                         }\r
253                         newType.addComponent("collectorState", CollectorState.BINDING_ITEM.type());\r
254                         Binding newBinding = Bindings.getBeanBinding(newType);\r
255                         Bean newConfig = (Bean) newBinding.createDefault();\r
256                         newConfig.readAvailableFields( config );\r
257                         config = newConfig;\r
258                 }\r
259                 config.setField("collectorState", CollectorState.BINDING_ITEM, state);\r
260                 return config;\r
261         }\r
262         \r
263         public SubscriptionItem[] getItems() {\r
264                 int c = state.itemStates.size();\r
265                 SubscriptionItem[] items = new SubscriptionItem[ c ];\r
266                 int ix = 0;\r
267                 Serializer s = Bindings.getSerializerUnchecked( CollectorState.BINDING_ITEM );\r
268                 for (CollectorState.Item item : state.itemStates.values()) {\r
269                         _HistoryItem hi = new _HistoryItem();\r
270                         hi.init();\r
271                         hi.readAvailableFields(item);\r
272                         //hi.collectorState = (Item) item.clone();\r
273                         \r
274                         // Workaround (clone doesn't work for WeightedMedian)\r
275                         try {\r
276                                 byte[] data = s.serialize( item );                              \r
277                                 hi.collectorState = (Item) s.deserialize(data);\r
278                         } catch (IOException e) {\r
279                         }\r
280                                         \r
281                         items[ix++] = hi;\r
282                 }\r
283                 return items;\r
284         }\r
285         \r
286         public static class _HistoryItem extends SubscriptionItem {\r
287                 public CollectorState.Item collectorState;\r
288         }\r
289         \r
290         /**\r
291          * Creates and loads active item. TimeSeries stream file is opened.\r
292          * \r
293          * @param itemState\r
294          * @return new active item\r
295          * @throws HistoryException\r
296          */\r
297         ActiveStream openStream(CollectorState.Item itemState) throws HistoryException \r
298         {\r
299                 // New Active Item\r
300                 ActiveStream x = new ActiveStream();\r
301                 x.itemState = itemState;\r
302 \r
303                 // Stream\r
304                 {\r
305                         if ( !history.exists(x.itemState.id) ) {\r
306                                 throw new HistoryException("TimeSeries "+x.itemState.id+" does not exist.");\r
307                         }\r
308 \r
309                         x.stream = history.openStream(x.itemState.id, "rw");\r
310                 }\r
311 \r
312                 // Sample Binding\r
313                 Datatype sampleType = x.stream.type().componentType;\r
314                 itemState.format = sampleType;\r
315                 try { \r
316                         x.binding = (RecordBinding) Bindings.getBeanBinding( sampleType );\r
317                 } catch ( RuntimeException e ) {\r
318                         System.err.println("Could not create bean binding for type "+sampleType);\r
319                         x.binding = (RecordBinding) Bindings.getBinding( sampleType );          \r
320                 }\r
321 \r
322                 // Value binding\r
323                 x.valueBinding = x.binding.getComponentBinding("value");\r
324                 x.isNumeric = x.valueBinding instanceof NumberBinding;\r
325                 x.hasEndTime = x.binding.getComponentIndex("endTime")>0;\r
326 \r
327                 // Value adapter\r
328                 if ( x.isNumeric ) \r
329                 try {\r
330                         x.valueToDouble = Bindings.adapterFactory.getAdapter(x.valueBinding, Bindings.DOUBLE, true, false);\r
331                         x.doubleToValue = Bindings.adapterFactory.getAdapter(Bindings.DOUBLE, x.valueBinding, true, false);\r
332                 } catch (AdapterConstructionException e1) {\r
333                         x.valueToDouble = null;\r
334                 }\r
335 \r
336                 // Create valueband, and read last entry into memory\r
337                 try {\r
338                         Object currentEntry = x.binding.createDefault();\r
339                         if (x.stream.size() >= 1) {\r
340                                 x.stream.get(x.stream.size() - 1, x.binding, currentEntry);\r
341                         }\r
342                         x.current = new ValueBand(x.binding);\r
343                         x.current.setSample(currentEntry);\r
344                 } catch (AccessorException e) {\r
345                         throw new HistoryException(\r
346                                         "Could not read sample from stream file, " + e, e);\r
347                 } catch (BindingException e) {\r
348                         throw new HistoryException(\r
349                                         "Could not read sample from stream file, " + e, e);\r
350                 }\r
351 \r
352                 // Item State\r
353                 x.itemState = itemState;\r
354 \r
355                 // Median\r
356                 if (x.isNumeric && x.current.hasMedian()\r
357                                 && x.itemState.median == null) {\r
358                         x.itemState.median = new WeightedMedian( MEDIAN_LIMIT );\r
359                 }\r
360 \r
361                 return x;\r
362         }\r
363                         \r
364         /**\r
365          * Read itemState from history. Item must exist. \r
366          * \r
367          * @param si subscription item\r
368          * @return state item\r
369          * @throws RuntimeBindingException\r
370          * @throws HistoryException\r
371          */\r
372         static void readItemState(HistoryManager history, Bean si, CollectorState.Item item) throws RuntimeBindingException, HistoryException {\r
373                 // Create item state\r
374                 item.readAvailableFields(si);\r
375                 // Read state from items\r
376                 StreamAccessor sa = history.openStream(item.id, "r");\r
377                 try {\r
378                         Datatype sampleType = sa.type().componentType;\r
379                         \r
380                         Binding sampleBinding = Bindings.getBinding(sampleType);\r
381                         if (!sampleType.equals( item.format )) throw new HistoryException("Could not create subscription, item \""+item.id+"\" already exists and has wrong format (" + sampleType + " vs. " + item.format + ")");\r
382                         Object sample = sampleBinding.createDefaultUnchecked();\r
383                         ValueBand v = new ValueBand(sampleBinding, sample);\r
384                         item.count = sa.size();\r
385                         if ( item.count>0 ) {\r
386                                 sa.get(0, sampleBinding, sample);\r
387                                 item.firstTime = v.getTimeDouble();\r
388                                 Binding b = v.getValueBinding();\r
389                                 Object value = v.getValue(b);\r
390                                 item.firstValue.setValue(b, value);\r
391                                         \r
392                                 sa.get(item.count-1, sampleBinding, sample);\r
393                                 item.currentTime = v.getTimeDouble();\r
394                                 b = v.getValueBinding();\r
395                                 value = v.getValue(b);\r
396                                 item.currentValue.setValue(b, value);\r
397                                         \r
398                                 if ( item.currentValue.getBinding() instanceof DoubleBinding ) {\r
399                                         DoubleBinding db = (DoubleBinding) item.currentValue.getBinding();\r
400                                         value = item.currentValue.getValue(db);\r
401                                         item.isNaN = Double.isNaN( (Double) value );\r
402                                 }\r
403                         }\r
404                                 \r
405                 } catch (AccessorException e) {\r
406                         throw new HistoryException( e );\r
407                 } catch (AdaptException e) {\r
408                         throw new HistoryException( e );\r
409                 } finally {\r
410                         try { sa.close(); } catch (AccessorException e) {}\r
411                 }\r
412         }\r
413 \r
414         @Override\r
415         public CollectorState getState() {\r
416                 return (CollectorState) state.clone();\r
417         }\r
418 \r
419         @Override\r
420         public void setState(Bean newState) {\r
421                 if (newState == null)\r
422                         return;\r
423                 if (state == null) {\r
424                         state = new CollectorState();\r
425                         state.init();\r
426                 }\r
427                 state.readAvailableFields(newState);\r
428 \r
429                 // Make sure that openAllItems ensures that all streams are opened properly.\r
430                 // This is because setItem will not invoke addItem if collectorState exists\r
431                 // for an item and therefore itemsOpen might not otherwise be set to false.\r
432                 itemsOpen = false;\r
433         }\r
434 \r
435         @Override\r
436         public Object getTime(Binding binding) throws HistoryException {\r
437                 MutableVariant v = state.time;\r
438                 if (v==null) return null;\r
439                 try {\r
440                         return v.getValue(binding);\r
441                 } catch (AdaptException e) {\r
442                         throw new HistoryException(e);\r
443                 }\r
444         }\r
445 \r
446         @Override\r
447         public Object getValue(String id, Binding binding) throws HistoryException {\r
448                 VariableState vs = state.values.get(id);\r
449                 if (vs==null) return null;\r
450                 \r
451                 if (!vs.isValid) return null;\r
452                 \r
453                 try {\r
454                         return vs.value.getValue(binding);\r
455                 } catch (AdaptException e) {\r
456                         throw new HistoryException(e);\r
457                 }\r
458         }\r
459         \r
460         void openAllItems() throws HistoryException {\r
461                 if (itemsOpen) return;\r
462 \r
463                 // Assert all items are open\r
464                 // 1. Collect all items that still need to be opened.\r
465                 Set<String> itemsToOpen = new THashSet<String>( state.itemStates.keySet() );\r
466                 for (ActiveStream as : items)\r
467                         itemsToOpen.remove(as.itemState.id);\r
468 \r
469                 // 2. Open missing streams\r
470                 List<ActiveStream> opened = new ArrayList<ActiveStream>(itemsToOpen.size());\r
471                 for (String itemId : itemsToOpen) {\r
472                         CollectorState.Item item = state.itemStates.get( itemId );\r
473                         ActiveStream as = openStream( item );\r
474                         opened.add(as);\r
475                 }\r
476 \r
477                 // 3. Update list of open streams\r
478                 items.addAll(opened);\r
479                 itemsOpen = true;\r
480         }\r
481 \r
482         private void updateDt(NumberBinding binding, Object time) {\r
483                 try {\r
484                         Double current = (Double)getTime(Bindings.DOUBLE);\r
485                         if(current != null) {\r
486                                 double t = binding.getValue(time).doubleValue();\r
487                                 state.dT = t-current;\r
488                                 if(state.dT > 0) return;\r
489                         }\r
490                 } catch (Throwable t) {\r
491                 }\r
492                 state.dT = 1.0;\r
493         }\r
494         \r
495         @Override\r
496         public void beginStep(NumberBinding binding, Object time) throws HistoryException {\r
497                 openAllItems();\r
498                 updateDt(binding, time);\r
499                 state.time.setValue(binding, time);\r
500         }\r
501 \r
502         @Override\r
503         public void setValue(String id, Binding binding, Object value) throws HistoryException {\r
504                 VariableState vs = state.values.get( id );\r
505                 if (vs == null) {\r
506                         vs = new VariableState();\r
507                         vs.value = new MutableVariant(binding, value);\r
508                         state.values.put(id, vs);\r
509                 }\r
510                 \r
511                 if (value==null) {\r
512                         vs.value.setValue(VoidBinding.VOID_BINDING, null);\r
513                         vs.isNan = true;\r
514                         vs.isValid = false;\r
515                         return;\r
516                 } \r
517                                         \r
518                 vs.value.setValue(binding, value);\r
519                 vs.isValid = true;\r
520                         \r
521                 if (binding instanceof DoubleBinding) {\r
522                         DoubleBinding db = (DoubleBinding) binding;\r
523                         try {\r
524                                 double _v = db.getValue_(value);\r
525                                 vs.isNan = Double.isNaN( _v );\r
526                         } catch (BindingException e) {\r
527                                 vs.isNan = true;\r
528                                 vs.isValid = false;\r
529                         }\r
530                 } else if (binding instanceof FloatBinding) {\r
531                         FloatBinding db = (FloatBinding) binding;\r
532                         try {\r
533                                 float _v = db.getValue_(value);\r
534                                 vs.isNan = Float.isNaN( _v );\r
535                         } catch (BindingException e) {\r
536                                 vs.isNan = true;\r
537                                 vs.isValid = false;\r
538                         }\r
539                 } else {\r
540                         vs.isNan = false;\r
541                 }\r
542         }\r
543 \r
544         @Override\r
545         public boolean getValue(String id, MutableVariant result) {\r
546                 VariableState vs = state.values.get(id);\r
547                 if (vs==null) return false;\r
548                 \r
549                 if (!vs.isValid) return false;\r
550                 \r
551                 result.setValue(vs.value.getBinding(), vs.value.getValue());\r
552                 return true;\r
553         }\r
554 \r
555         @Override\r
556         public void endStep() throws HistoryException {\r
557                 // Write values to streams\r
558                 for (ActiveStream i : items)\r
559                 {\r
560                         try {\r
561                                 putValueToStream(i);                            \r
562                                 if (flushPolicy == FlushPolicy.FlushOnEveryStep) {\r
563                                         i.stream.flush();\r
564                                 }\r
565                         } catch (AdaptException e) {\r
566                                 throw new HistoryException( e );\r
567 //                              FileAccessor fa = (FileAccessor) i.stream;\r
568 //                              logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
569                         } catch (AccessorException e) {\r
570                                 throw new HistoryException( e );\r
571 //                              FileAccessor fa = (FileAccessor) i.stream;\r
572 //                              logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
573                         } catch (BindingException e) {\r
574                                 throw new HistoryException( e );\r
575 //                              FileAccessor fa = (FileAccessor) i.stream;\r
576 //                              logger.log(Level.FINE, "Error writing a stream "+fa.file(), e);\r
577                         }\r
578                 }\r
579         }\r
580                 \r
581         private int sampleNum(double time, double interval) {\r
582                 // 1.0 is needed to cover the case of 0.0\r
583                 return (int)(1.0 + (time+EPSILON*state.dT)/interval);\r
584         }\r
585 \r
586         private boolean isExactInterval(double time, double interval) {\r
587                 return sampleNum(time, interval) > sampleNum(time-2*EPSILON*state.dT, interval);\r
588         }\r
589 \r
590         /**\r
591          * This method puts a time,value to a stream.\r
592          * \r
593          * The stream is "packed" when the new value is equal to previous, or the\r
594          * value is within deadband (in comparison to the first value), or if\r
595          * the interval zone is not exceeded.\r
596          * \r
597          * The stream can be packed two different ways, depending on the sample\r
598          * format. The sample format can be (a) single value or (b) a time range of\r
599          * multiple values, Ranged Sample. \r
600          * \r
601          * Single value is packed by removing middle values, in other words, \r
602          * moving the timecode of the last sample.\r
603          * \r
604          * Ranged sample is packed by extending the endTime code of the last sample.\r
605          * Other fields, such as avg, sum, count, lastValue are also updated.\r
606          * \r
607          * @param i\r
608          * @throws AdaptException\r
609          * @throws AccessorException \r
610          * @throws BindingException \r
611          * @throws HistoryException \r
612          */\r
613         private void putValueToStream(ActiveStream i) throws AdaptException, AccessorException, BindingException, HistoryException\r
614         {\r
615                 // Current Time\r
616                 Variant time = state.time;\r
617                 double currentTime = Double.NaN;\r
618                 if (time != null) {\r
619                         Double x = (Double) time.getValue(Bindings.DOUBLE);\r
620                         currentTime = x == null ? Double.NaN : x;\r
621                 }\r
622 \r
623                 // Is first value\r
624                 boolean isFirstValue = i.stream.size() == 0;\r
625                 // Is this item disabled\r
626                 boolean isDisabled = !i.itemState.enabled; \r
627                 // Discontinuety, first value after being disabled\r
628                 boolean wasDisabled = !isFirstValue && i.itemState.isDisabled;\r
629                 \r
630                 // Update currently disabled status\r
631                 i.itemState.isDisabled = isDisabled;\r
632 \r
633                 // Update time codes\r
634                 if (isDisabled) {\r
635                         i.itemState.count++;\r
636                         i.itemState.lastDisabledTime = currentTime;\r
637                         if (!wasDisabled) i.itemState.firstDisabledTime = currentTime;\r
638                         return;\r
639                 }\r
640                 \r
641                 // Is nan value, for doubles and floats\r
642                 boolean isNanValue;\r
643                 // Is valid value\r
644                 boolean isValidValue;\r
645                 // Value as variant\r
646                 MutableVariant value;\r
647                 // Get current value as double\r
648                 double currentValueDouble;\r
649                 \r
650                 VariableState vs = state.values.get( i.itemState.variableId );                  \r
651                 if ( vs!=null ) {\r
652                         isNanValue = vs.isNan;\r
653                         isValidValue = vs.isValid;\r
654                         value = vs.value;\r
655                         if (i.isNumeric && isValidValue && !isNanValue) {\r
656                                 currentValueDouble = (Double) value.getValue( Bindings.DOUBLE );\r
657                                 if ( i.itemState.gain!=1.0 || i.itemState.bias!=0.0 ) {\r
658                                         currentValueDouble = i.itemState.gain * currentValueDouble + i.itemState.bias;\r
659                                         Object x = i.valueBinding instanceof DoubleBindingDefault ? currentValueDouble : i.doubleToValue.adapt(currentValueDouble);\r
660                                         value = new MutableVariant( i.valueBinding, x );\r
661                                 }\r
662                         } else {\r
663                                 currentValueDouble = Double.NaN;\r
664                         }\r
665                         \r
666                 } else {\r
667                         isNanValue = true;\r
668                         isValidValue = false;\r
669                         value = new MutableVariant( i.valueBinding, i.valueBinding.createDefault() );\r
670                         currentValueDouble = Double.NaN;\r
671                 }\r
672                                 \r
673                 // Prev Value\r
674                 double prevValueDouble = Double.NaN;\r
675                 Variant pv = i.itemState.currentValue;\r
676                 if (pv!=null && pv.getBinding() instanceof NumberBinding ) {\r
677                         prevValueDouble = (Double) pv.getValue( Bindings.DOUBLE );\r
678                 } else {\r
679                         prevValueDouble = Double.NaN;\r
680                 }\r
681                 \r
682                 // The time of the first sample of region\r
683                 double firstTime = i.itemState.firstTime;\r
684                 // Prev time\r
685                 double prevTime = i.itemState.currentTime;\r
686                 \r
687                 boolean restep = prevTime == currentTime;\r
688                 \r
689                 // There was change in NaN\r
690                 boolean isNaNChange = i.itemState.isNaN != isNanValue;\r
691                 \r
692                 // There was change in validity of the value\r
693                 boolean isValidChange = i.itemState.isValid != isValidValue;\r
694                 \r
695                 //boolean wasValidValue = !i.itemState.isValid && isValidValue;\r
696                 \r
697                 boolean hasInterval = !Double.isNaN( i.itemState.interval ) && (i.itemState.interval> Double.MIN_VALUE );\r
698                 \r
699                 boolean hasDeadband = !Double.isNaN( i.itemState.deadband ) && (i.itemState.deadband> Double.MIN_VALUE) && i.isNumeric; \r
700                 \r
701                 boolean isMinMaxFormat = i.itemState.interval==Double.MAX_VALUE &&\r
702                                 i.itemState.deadband==Double.MAX_VALUE &&\r
703                                 i.binding.getComponentIndex("min")>=0 &&\r
704                                 i.binding.getComponentIndex("max")>=0;\r
705 \r
706                 // Deadband\r
707                 if (isValidValue && i.isNumeric && i.itemState.count>=1) {\r
708                         \r
709                         if (i.itemState.isValid) {\r
710                                 double firstValueDouble = (Double) i.itemState.firstValue.getValue( Bindings.DOUBLE );\r
711                                 double diff = Math.abs( currentValueDouble - firstValueDouble );\r
712                                 if( hasDeadband )\r
713                                         i.itemState.ooDeadband |= diff >= i.itemState.deadband;\r
714                                 else\r
715                                         i.itemState.ooDeadband |= diff != 0.0;\r
716                         } else {\r
717                                 i.itemState.ooDeadband = true;\r
718                         }\r
719                         \r
720                         // NaN change is deadband change\r
721                         i.itemState.ooDeadband |= isNaNChange;\r
722                 } else {\r
723                         // The dead band cannot be have broken in first sample\r
724                         i.itemState.ooDeadband = false;\r
725                 }\r
726 \r
727 \r
728                 // Interval\r
729                 boolean ooInterval = true;\r
730                 if ( hasInterval ) {\r
731                         \r
732 //                      System.err.println(i.itemState.id + " " + firstTime + " " + currentTime);\r
733 \r
734                         // First band is a special case\r
735                         if(isFirstValue) {\r
736                                 // If this is the first sample store the current time as first time\r
737                                 if(Double.isNaN(i.itemState.firstTime)) firstTime = i.itemState.firstTime = currentTime;\r
738                                 int sn1 = sampleNum(firstTime,i.itemState.interval);\r
739                                 int sn2 = sampleNum(currentTime,i.itemState.interval);\r
740                                 // Wait until next sample if this position is not already exactly at sample border\r
741                                 // Always start minmax immediately\r
742                                 if(sn1 == sn2 && !isExactInterval(currentTime, i.itemState.interval) && !isMinMaxFormat) return;\r
743                         } else {\r
744                                 int sn1 = sampleNum(prevTime,i.itemState.interval);\r
745                                 int sn2 = sampleNum(currentTime,i.itemState.interval);\r
746 //                              System.err.println(" ==>" + sn1 + " " + sn2 + " " + prevTime + " " + currentTime);\r
747                                 ooInterval = sn2 > sn1;\r
748                         }\r
749 \r
750                         \r
751 //                      \r
752 //                      double n = (currentTime+EPSILON) / i.itemState.interval;\r
753 //                      double n2 = n-Math.floor(n);\r
754 //                      \r
755 //                      \r
756 //                      double deltaTime = currentTime - firstTime;\r
757 //                      ooInterval = (deltaTime - i.itemState.interval) > -EPSILON;\r
758                 }\r
759                 \r
760                 // Should new sample be created, or old sample extended / forwarded in time\r
761                 boolean makeNewBand = ( i.itemState.ooDeadband && isValidValue && ooInterval ) || isFirstValue || wasDisabled || isValidChange;\r
762                 \r
763                 // Update item min-max, avg, median\r
764                 if ( isValidValue && i.isNumeric ) {\r
765                         if (i.current.hasAvg() && i.isNumeric) {\r
766                                 if (Double.isNaN( i.itemState.sum )) i.itemState.sum = 0.0;\r
767                                 if (isValidValue && i.isNumeric) {\r
768                                         double timeDelta = currentTime - i.itemState.currentTime;                                                       \r
769                                         i.itemState.sum += timeDelta * currentValueDouble;\r
770                                 }\r
771                         }\r
772 \r
773                         if (i.current.hasMedian() && !Double.isNaN(prevValueDouble) ) {\r
774                                 double deltaTime = currentTime - prevTime;\r
775                                 i.itemState.median.add( deltaTime, prevValueDouble );\r
776                         }\r
777                 }\r
778 \r
779                 // Increase the counter that tracks the number of samples acquired into the band\r
780                 if (!restep) i.itemState.count++;\r
781 \r
782                 // Remember values \r
783                 i.itemState.currentTime = currentTime;\r
784                 i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );                                                      \r
785                 i.itemState.isNaN = isNanValue;\r
786                 i.itemState.isValid = isValidValue;\r
787                 \r
788                 // Put discontinuation marker\r
789                 if ( (wasDisabled && i.current.supportsNullValue())) {\r
790                         // Extend marker\r
791                         if (i.current.isNullValue()) {\r
792                                 if (i.current.hasEndTime()) {\r
793                                         i.current.setEndTime( Bindings.DOUBLE, i.itemState.lastDisabledTime );\r
794                                 }\r
795                                 \r
796                                 if (i.current.hasCount()) {\r
797                                         i.current.setCount( i.current.getCount()+1 );\r
798                                 }\r
799                                 \r
800                                 // Add the band to the file stream\r
801                                 i.stream.setNoflush(i.stream.size()-1, i.binding, i.current.getSample() );\r
802                         }\r
803                         \r
804                         // Add marker\r
805                         i.current.reset();\r
806                         i.current.setValueNull();\r
807                         i.current.setTime(Bindings.DOUBLE, i.itemState.firstDisabledTime);\r
808                         i.current.setEndTime(Bindings.DOUBLE, i.itemState.lastDisabledTime);\r
809                         i.stream.addNoflush( i.binding, i.current.getSample() );\r
810                 }\r
811                 \r
812                 boolean condition = makeNewBand || (!i.hasEndTime && i.itemState.count==2); \r
813 \r
814                 if(i.stream.size() > 0 && !i.itemState.ooDeadband) {\r
815 \r
816                         /// Extend existing band\r
817                         // Extend endTime value\r
818                         if (!i.hasEndTime) {\r
819                                 i.current.setTime( time.getBinding(), time.getValue() );\r
820                         } else {\r
821                                 i.current.setEndTime( time.getBinding(), time.getValue() );\r
822                         }\r
823 \r
824                         if (isValidValue) {\r
825                                 // Set last value\r
826                                 if (i.current.hasLastValue()) {\r
827                                         i.current.setLastValue( value.getBinding(), value.getValue() );\r
828                                 }\r
829 \r
830                                 // Add sum\r
831                                 if (i.current.hasAvg() && i.isNumeric && !isNanValue && !restep) {\r
832                                         double duration = (currentTime - i.itemState.firstTime);\r
833                                         if(duration < 1e-9) {\r
834                                                 i.current.setAvg( Bindings.DOUBLE, 0.0 );                                               \r
835                                         } else {\r
836                                                 i.current.setAvg( Bindings.DOUBLE, i.itemState.sum / duration);                                         \r
837                                         }\r
838                                 }\r
839 \r
840                                 // Update min-max\r
841                                 if (i.current.hasMin() && i.isNumeric && !isNanValue) {\r
842                                         Binding minBinding = i.current.getMinBinding();\r
843                                         Object prevMinValue = i.current.getMin();\r
844                                         Object currentValueWithMinBinding = value.getValue( minBinding );\r
845                                         int diff = minBinding.compare( prevMinValue, currentValueWithMinBinding );\r
846                                         if (diff>0) i.current.setMin( minBinding, currentValueWithMinBinding );\r
847                                 }                               \r
848                                 if (i.current.hasMax() && i.isNumeric && !isNanValue) {\r
849                                         Binding maxBinding = i.current.getMaxBinding();\r
850                                         Object prevMaxValue = i.current.getMax();\r
851                                         Object currentValueWithMaxBinding = value.getValue( maxBinding );\r
852                                         int diff = maxBinding.compare( prevMaxValue, currentValueWithMaxBinding );\r
853                                         if (diff<0) i.current.setMax( maxBinding, currentValueWithMaxBinding );\r
854                                 }\r
855 \r
856                                 // Update median\r
857                                 if (i.current.hasMedian() && i.isNumeric && !isNanValue) {\r
858                                         Double median = i.itemState.median.getMedian();\r
859                                         i.current.setMedian( Bindings.DOUBLE, median);\r
860                                 }                               \r
861                         } else {\r
862                                 i.current.setValueNull();\r
863                         }\r
864 \r
865                         // Add count\r
866                         if (i.current.hasCount()) {\r
867                                 i.current.setCount( i.itemState.count );\r
868                         }\r
869 \r
870                         // Write entry to file stream\r
871                         //              if (i.current.getTimeDouble()<0) \r
872                         //                      System.err.println("Error sample "+i.current.getBinding().toString(i.current.getSample()));\r
873                         i.stream.setNoflush( i.stream.size()-1, i.binding, i.current.getSample() );                             \r
874                 }\r
875                 \r
876                 if (condition) {\r
877                         \r
878                         // Start a new value band\r
879                         i.itemState.firstValue.readFrom( value.getBinding(), value.getValue() );\r
880                         i.itemState.firstTime = currentTime;\r
881                         i.itemState.currentValue.readFrom( value.getBinding(), value.getValue() );\r
882                         i.itemState.currentTime = currentTime;\r
883                         i.itemState.isNaN = isNanValue;\r
884                         i.itemState.isValid = isValidValue;\r
885                         i.itemState.sum = 0;\r
886                         i.itemState.count = 1;\r
887                         i.itemState.ooDeadband = false;\r
888                         if (i.itemState.median != null) {\r
889                                 i.itemState.median.clear();\r
890                                 if (!Double.isNaN(prevValueDouble) ) {\r
891                                         double deltaTime = currentTime - prevTime;\r
892                                         i.itemState.median.add( deltaTime, prevValueDouble );\r
893                                 }\r
894                         }\r
895                         \r
896                         // New Sample\r
897                         i.current.reset();\r
898                         \r
899                         // Start new band with current value & time\r
900                         i.current.setTime( time.getBinding(), time.getValue() );                \r
901 \r
902                         // Is valid value\r
903                         if (i.current.hasEndTime()) {\r
904                                 i.current.setEndTime( time.getBinding(), time.getValue() );\r
905                         }\r
906                         \r
907                         if (isValidValue) {\r
908                                 i.current.setValue( value.getBinding(), value.getValue() );\r
909                                 i.current.setMax( value.getBinding(), value.getValue() );\r
910                                 i.current.setMin( value.getBinding(), value.getValue() );\r
911                                 i.current.setLastValue( value.getBinding(), value.getValue() );\r
912                                 i.current.setAvg( value.getBinding(), value.getValue() );\r
913                                 i.current.setMedian( value.getBinding(), value.getValue() );                                    \r
914                         } else {\r
915                                 i.current.setValueNull();\r
916                         }\r
917                         \r
918                                 \r
919                         // Sample Count=1 for new bands (time, value) "added"\r
920                         if (i.current.hasCount()) {\r
921                                 i.current.setCount( 1 );\r
922                         }\r
923                         \r
924                         // Add the band to the file stream\r
925                         i.stream.addNoflush( i.binding, i.current.getSample() );\r
926                         \r
927                 }               \r
928                 \r
929         }\r
930 \r
931         @Override\r
932         public void flush() /*throws HistoryException*/ {\r
933                 for (ActiveStream i : items) {\r
934                         try {                                           \r
935                                 i.stream.flush();\r
936                         } catch (AccessorException e) {\r
937                                 logger.log(Level.FINE, "File history flush failed.", e);                        \r
938 //                              throw new HistoryException(e);\r
939                         }\r
940                 }\r
941         }\r
942 \r
943         public void flush(Set<String> ids) /*throws HistoryException*/ {\r
944                 for (ActiveStream i : items) {\r
945                         if (!ids.contains(i.itemState.id)) continue;\r
946                         try {                                           \r
947                                 i.stream.flush();\r
948                         } catch (AccessorException e) {\r
949                                 logger.log(Level.FINE, "File history flush failed.", e);                        \r
950 //                              throw new HistoryException(e);\r
951                         }\r
952                 }\r
953         }\r
954         \r
955         \r
956         @Override\r
957         public synchronized void close()\r
958         {\r
959                 // Write items back to history\r
960                 List<Bean> beans = new ArrayList<Bean>( state.itemStates.size() );\r
961                 for (CollectorState.Item itemState : state.itemStates.values()) {\r
962                         try {\r
963                                 if (!history.exists(itemState.id)) continue;\r
964                                 Bean bean = history.getItem(itemState.id);\r
965                                 bean.readAvailableFields(itemState);\r
966                                 beans.add( setItemState(bean, itemState) );\r
967                         } catch (BindingException e) {\r
968                                 logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
969                         } catch (BindingConstructionException e) {\r
970                                 logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
971                         } catch (HistoryException e) {\r
972                                 logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
973                         }\r
974                 }\r
975                 try {\r
976                         history.modify(beans.toArray( new Bean[beans.size()] ));\r
977                 } catch (HistoryException e) {\r
978                         logger.log(Level.FINE, "Error writing a meta-data to history", e);\r
979                 }\r
980                 state.itemStates.clear();\r
981                 \r
982                 // Close streams\r
983                 for (ActiveStream item : items) {\r
984                         item.close();\r
985                 }\r
986                 items.clear();\r
987         }\r
988 \r
989         public class ActiveStream {\r
990 \r
991                 /** Sample Stream */\r
992                 public StreamAccessor stream;\r
993                 \r
994                 /** Sample binding */\r
995                 public RecordBinding binding;\r
996                 \r
997                 /** Value binding (Generic) */\r
998                 public Binding valueBinding;\r
999                 \r
1000                 /** Set true if this item has numeric value type */\r
1001                 public boolean isNumeric;\r
1002                 \r
1003                 /** Set true if this item has end time */\r
1004                 public boolean hasEndTime;\r
1005                 \r
1006                 /** Item entry in subscription state */\r
1007                 public CollectorState.Item itemState;\r
1008                                 \r
1009                 /** Interface to current last sample (in the stream) */\r
1010                 public ValueBand current;\r
1011                 \r
1012                 /** Adapter that converts value to double value, null if not numeric */\r
1013                 public Adapter valueToDouble, doubleToValue;\r
1014                 \r
1015                 void close() {\r
1016                         try {\r
1017                                 if (stream != null) { \r
1018                                         stream.close();\r
1019                                 }\r
1020                                 stream = null;\r
1021                         } catch (AccessorException e) {\r
1022                                 logger.log(Level.FINE, "Error closing stream", e);\r
1023                         }\r
1024                 }\r
1025 \r
1026         }\r
1027 \r
1028         @Override\r
1029         public StreamAccessor openStream(String itemId, String mode) \r
1030         throws HistoryException \r
1031         {\r
1032                 return history.openStream(itemId, mode);\r
1033         }\r
1034 \r
1035         @Override\r
1036         public HistoryManager getHistory() {\r
1037                 return history;\r
1038         }\r
1039 \r
1040 }\r