Acorn: Fix WriteRunnable.runReally() and other fixes
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / GraphClientImpl2.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.acorn;
13
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23
24 import org.simantics.acorn.exception.AcornAccessVerificationException;
25 import org.simantics.acorn.exception.IllegalAcornStateException;
26 import org.simantics.acorn.internal.ClusterChange;
27 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
28 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
29 import org.simantics.acorn.lru.ClusterInfo;
30 import org.simantics.acorn.lru.ClusterStreamChunk;
31 import org.simantics.acorn.lru.ClusterUpdateOperation;
32 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
33 import org.simantics.db.ClusterCreator;
34 import org.simantics.db.Database;
35 import org.simantics.db.ServiceLocator;
36 import org.simantics.db.common.utils.Logger;
37 import org.simantics.db.exception.DatabaseException;
38 import org.simantics.db.exception.SDBException;
39 import org.simantics.db.server.ProCoreException;
40 import org.simantics.db.service.ClusterSetsSupport;
41 import org.simantics.db.service.ClusterUID;
42 import org.simantics.db.service.LifecycleSupport;
43 import org.simantics.utils.datastructures.Pair;
44 import org.simantics.utils.logging.TimeLogger;
45
46 import gnu.trove.map.hash.TLongObjectHashMap;
47
48 public class GraphClientImpl2 implements Database.Session {
49         
50         public static final boolean DEBUG = false;
51
52         public final ClusterManager clusters;
53         
54         private TransactionManager transactionManager = new TransactionManager();
55         private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
56         private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
57
58         private Path dbFolder;
59         private final Database database;
60         private ServiceLocator locator;
61         private MainProgram mainProgram;
62
63         static class ClientThreadFactory implements ThreadFactory {
64                 
65                 final String name;
66                 final boolean daemon;
67                 
68                 public ClientThreadFactory(String name, boolean daemon) {
69                         this.name = name;
70                         this.daemon = daemon;
71                 }
72                 
73                 @Override
74                 public Thread newThread(Runnable r) {
75                         Thread thread = new Thread(r, name);
76                         thread.setDaemon(daemon);
77                         return thread;
78                 }
79         }
80
81         public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
82             this.database = database;
83             this.dbFolder = dbFolder;
84             this.locator = locator;
85             this.clusters = new ClusterManager(dbFolder);
86             load();
87             ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
88             cssi.setReadDirectory(clusters.lastSessionDirectory);
89             mainProgram = new MainProgram(this, clusters);
90             executor.execute(mainProgram);
91         }
92
93         public Path getDbFolder() {
94             return dbFolder;
95         }
96
97         public void tryMakeSnapshot() throws IOException {
98                 
99             if (isClosing || unexpectedClose)
100                 return;
101             
102                 saver.execute(new Runnable() {
103
104                         @Override
105                         public void run() {
106                                 Transaction tr = null;
107                                 try {
108                                         // First take a write transaction
109                                         tr = askWriteTransaction(-1);
110                                         // Then make sure that MainProgram is idling
111                                         mainProgram.mutex.acquire();
112                                         try {
113                                                 synchronized(mainProgram) {
114                                                         if(mainProgram.operations.isEmpty()) {
115                                                                 makeSnapshot(false);
116                                                         } else {
117                                                                 // MainProgram is becoming busy again - delay snapshotting
118                                                                 return;
119                                                         }
120                                                 }
121                                         } finally {
122                                                 mainProgram.mutex.release();
123                                         }
124                                 } catch (IllegalAcornStateException | ProCoreException e) {
125                                         Logger.defaultLogError(e);
126                                         unexpectedClose = true;
127                                 } catch (InterruptedException e) {
128                                         Logger.defaultLogError(e);
129                                 } finally {
130                                         try {
131                                                 if(tr != null)
132                                                         endTransaction(tr.getTransactionId());
133                                                 if (unexpectedClose) {
134                                                 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
135                             try {
136                                 support.close();
137                             } catch (DatabaseException e1) {
138                                 Logger.defaultLogError(e1);
139                             }
140                                                 }
141                                         } catch (ProCoreException e) {
142                                                 Logger.defaultLogError(e);
143                                         }
144                                 }
145                         }
146                 });
147         }
148         
149     public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
150         clusters.makeSnapshot(locator, fullSave);
151     }
152         
153         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
154             try {
155             return clusters.clone(uid, creator);
156         } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
157             unexpectedClose = true;
158             throw new DatabaseException(e);
159         }
160         }
161
162 //      private void save() throws IOException {
163 //              clusters.save();
164 //      }
165         
166         public void load() throws IOException {
167                 clusters.load();
168         }
169         
170 //      public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
171 //              clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
172 //      }
173
174         @Override
175         public Database getDatabase() {
176                 return database;
177         }
178
179         private boolean closed = false;
180         private boolean isClosing = false;
181         private boolean unexpectedClose = false;
182         
183         @Override
184         public void close() throws ProCoreException {
185             System.err.println("Closing " + this + " and mainProgram " + mainProgram);
186                 if(!closed && !isClosing) {
187                     isClosing = true;
188                         try {
189                             if (!unexpectedClose)
190                                 makeSnapshot(true);
191                                 
192                                 mainProgram.close();
193                                 clusters.shutdown();
194                                 executor.shutdown();
195                                 saver.shutdown();
196                                 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
197                                 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
198                                 
199                                 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
200                                 
201                                 mainProgram = null;
202                                 executor = null;
203                                 saver = null;
204                                 
205                         } catch (IllegalAcornStateException | InterruptedException e) {
206                                 throw new ProCoreException(e);
207                         }
208                         closed = true;
209                 }
210                 //impl.close();
211         }
212
213         @Override
214         public void open() throws ProCoreException {
215                 throw new UnsupportedOperationException();
216         }
217
218         @Override
219         public boolean isClosed() throws ProCoreException {
220                 return closed;
221         }
222         
223         @Override
224         public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
225                 clusters.state.headChangeSetId++;
226                 long committedChangeSetId = changeSetId + 1;
227                 try {
228                 clusters.commitChangeSet(committedChangeSetId, metadata);
229                 
230                 clusters.state.transactionId = transactionId;
231                 
232                 mainProgram.committed();
233                 
234                 TimeLogger.log("Accepted commit");
235                 } catch (IllegalAcornStateException e) {
236                     throw new ProCoreException(e);
237                 }
238         }
239
240         @Override
241         public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
242             UnsupportedOperationException e = new UnsupportedOperationException("org.simantics.acorn.GraphClientImpl2.cancelCommit() is not supported operation! Closing down to prevent further havoc");
243             clusters.notSafeToMakeSnapshot(new IllegalAcornStateException(e));
244             throw e;
245 //          System.err.println("GraphClientImpl2.cancelCommit() called!! this is experimental and might cause havoc!");
246 //          try {
247 //            undo(new long[] {changeSetId}, onChangeSetUpdate);
248 //        } catch (SDBException e) {
249 //            e.printStackTrace();
250 //            throw new ProCoreException(e);
251 //        }
252 //          clusters.state.headChangeSetId++;
253 //          return clusters.state.headChangeSetId;
254         }
255
256         @Override
257         public Transaction askReadTransaction() throws ProCoreException {
258                 return transactionManager.askReadTransaction();
259         }
260
261         enum TransactionState {
262                 IDLE,WRITE,READ
263         }
264         
265         class TransactionRequest {
266                 public TransactionState state;
267                 public Semaphore semaphore;
268                 public TransactionRequest(TransactionState state, Semaphore semaphore) {
269                         this.state = state;
270                         this.semaphore = semaphore;
271                 }
272         }
273
274         class TransactionManager {
275
276                 private TransactionState currentTransactionState = TransactionState.IDLE;
277                 
278                 private int reads = 0;
279                 
280                 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
281                 
282                 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
283                 
284                 private synchronized Transaction makeTransaction(TransactionRequest req) {
285                         
286                         final int csId = clusters.state.headChangeSetId;
287                         final long trId = clusters.state.transactionId+1;
288                         requestMap.put(trId, req);
289                         return new Transaction() {
290                                 
291                                 @Override
292                                 public long getTransactionId() {
293                                         return trId;
294                                 }
295                                 
296                                 @Override
297                                 public long getHeadChangeSetId() {
298                                         return csId;
299                                 }
300                         };
301                 }
302                 
303                 /*
304                  * This method cannot be synchronized since it waits and must support multiple entries
305                  * by query thread(s) and internal transactions such as snapshot saver
306                  */
307                 public Transaction askReadTransaction() throws ProCoreException {
308                 
309                         Semaphore semaphore = new Semaphore(0);
310                         
311                         TransactionRequest req = queue(TransactionState.READ, semaphore);
312                         
313                         try {
314                                 semaphore.acquire();
315                         } catch (InterruptedException e) {
316                                 throw new ProCoreException(e);
317                         }
318                         
319                         return makeTransaction(req);
320
321                 }
322
323                 private synchronized void dispatch() {
324                         TransactionRequest r = requests.removeFirst();
325                         if(r.state == TransactionState.READ) reads++;
326                         r.semaphore.release();
327                 }
328                 
329                 private synchronized void processRequests() {
330                         
331                         while(true) {
332
333                                 if(requests.isEmpty()) return;
334                                 TransactionRequest req = requests.peek();
335
336                                 if(currentTransactionState == TransactionState.IDLE) {
337                                 
338                                         // Accept anything while IDLE
339                                         currentTransactionState = req.state;
340                                         dispatch();
341                                         
342                                 } else if (currentTransactionState == TransactionState.READ) {
343                                         
344                                         if(req.state == currentTransactionState) {
345
346                                                 // Allow other reads
347                                                 dispatch();
348
349                                         } else {
350                                                 
351                                                 // Wait
352                                                 return;
353                                                 
354                                         }
355                                         
356                                 }  else if (currentTransactionState == TransactionState.WRITE) {
357
358                                         // Wait
359                                         return;
360                                         
361                                 }
362                                 
363                         }
364                         
365                 }
366                 
367                 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
368                         TransactionRequest req = new TransactionRequest(state, semaphore); 
369                         requests.addLast(req);
370                         processRequests();
371                         return req;
372                 }
373                 
374                 /*
375                  * This method cannot be synchronized since it waits and must support multiple entries
376                  * by query thread(s) and internal transactions such as snapshot saver
377                  */
378                 public Transaction askWriteTransaction() throws IllegalAcornStateException {
379                         
380                         Semaphore semaphore = new Semaphore(0);
381                         TransactionRequest req = queue(TransactionState.WRITE, semaphore);
382                         
383                         try {
384                                 semaphore.acquire();
385                         } catch (InterruptedException e) {
386                                 throw new IllegalAcornStateException(e);
387                         }
388                         mainProgram.startTransaction(clusters.state.headChangeSetId+1);
389                         return makeTransaction(req);
390                 }
391                 
392                 public synchronized long endTransaction(long transactionId) throws ProCoreException {
393                         
394                         TransactionRequest req = requestMap.remove(transactionId);
395                         if(req.state == TransactionState.WRITE) {
396                                 currentTransactionState = TransactionState.IDLE;
397                                 processRequests();
398                         } else {
399                                 reads--;
400                                 if(reads == 0) {
401                                         currentTransactionState = TransactionState.IDLE;
402                                         processRequests();
403                                 }
404                         }
405                         return clusters.state.transactionId;
406                 }
407
408         }
409         
410         @Override
411         public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
412                 try {
413                     if (isClosing || unexpectedClose || closed) {
414                         throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
415                     }
416             return transactionManager.askWriteTransaction();
417         } catch (IllegalAcornStateException e) {
418             throw new ProCoreException(e);
419         }
420         }
421
422         @Override
423         public long endTransaction(long transactionId) throws ProCoreException {
424                 return transactionManager.endTransaction(transactionId);
425         }
426
427         @Override
428         public String execute(String command) throws ProCoreException {
429                 // This is called only by WriteGraphImpl.commitAccessorChanges
430                 // We can ignore this in Acorn
431                 return "";
432         }
433
434         @Override
435         public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
436                 try {
437             return clusters.getMetadata(changeSetId);
438         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
439             throw new ProCoreException(e);
440         }
441         }
442
443         @Override
444         public ChangeSetData getChangeSetData(long minChangeSetId,
445                         long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
446                         throws ProCoreException {
447                 
448                 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
449                 return null;
450                 
451         }
452
453         @Override
454         public ChangeSetIds getChangeSetIds() throws ProCoreException {
455                 throw new UnsupportedOperationException();
456         }
457
458         @Override
459         public Cluster getCluster(byte[] clusterId) throws ProCoreException {
460                 throw new UnsupportedOperationException();
461         }
462
463         @Override
464         public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
465                         throws ProCoreException {
466                 throw new UnsupportedOperationException();
467         }
468
469         @Override
470         public ClusterIds getClusterIds() throws ProCoreException {
471                 try {
472             return clusters.getClusterIds();
473         } catch (IllegalAcornStateException e) {
474             throw new ProCoreException(e);
475         }
476         }
477
478         @Override
479         public Information getInformation() throws ProCoreException {
480                 return new Information() {
481
482                         @Override
483                         public String getServerId() {
484                                 return "server";
485                         }
486
487                         @Override
488                         public String getProtocolId() {
489                                 return "";
490                         }
491
492                         @Override
493                         public String getDatabaseId() {
494                                 return "database";
495                         }
496
497                         @Override
498                         public long getFirstChangeSetId() {
499                                 return 0;
500                         }
501                         
502                 };
503         }
504
505         @Override
506         public Refresh getRefresh(long changeSetId) throws ProCoreException {
507                 
508                 final ClusterIds ids = getClusterIds();
509                 
510                 return new Refresh() {
511
512                         @Override
513                         public long getHeadChangeSetId() {
514                                 return clusters.state.headChangeSetId;
515                         }
516
517                         @Override
518                         public long[] getFirst() {
519                                 return ids.getFirst();
520                         }
521
522                         @Override
523                         public long[] getSecond() {
524                                 return ids.getSecond();
525                         }
526                         
527                 };
528                 
529         }
530
531         public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
532                 return clusters.getResourceFile(clusterUID, resourceIndex);
533         }
534
535         @Override
536         public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
537                 try {
538             return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
539         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
540             throw new ProCoreException(e);
541         }
542         }
543
544         @Override
545         public long reserveIds(int count) throws ProCoreException {
546                 return clusters.state.reservedIds++;
547         }
548
549         @Override
550         public void updateCluster(byte[] operations) throws ProCoreException {
551             ClusterInfo info = null;
552             try {
553                 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
554                 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
555                 if(info == null)
556                     throw new IllegalAcornStateException("info == null for operation " + operation);
557                 info.acquireMutex();
558                         info.scheduleUpdate();
559                         mainProgram.schedule(operation);
560                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
561             throw new ProCoreException(e);
562         } finally {
563             if (info != null)
564                 info.releaseMutex();
565                 }
566         }
567
568         private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
569
570                 String[] ss = ccsId.split("\\.");
571                 String chunkKey = ss[0];
572                 int chunkOffset = Integer.parseInt(ss[1]);
573                 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
574                 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
575                 chunk.acquireMutex();
576                 try {
577                         return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
578                 } catch (DatabaseException e) {
579                     throw e;
580                 } catch (Throwable t) {
581                         throw new IllegalStateException(t);
582                 } finally {
583                         chunk.releaseMutex();
584                 }
585         }
586         
587         private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
588                 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
589
590                 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
591
592                 clusters.clusterLRU.acquireMutex();
593                 try {
594
595                         ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
596                         for(int i=0;i<proc.entries.size();i++) {
597                                 
598                                 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
599                                 e.process(clusters, cs, clusterKey);
600                         }
601                         cs.flush();
602
603                 } finally {
604                         clusters.clusterLRU.releaseMutex();
605                 }
606         }
607         
608         @Override
609         public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
610             try {
611                 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
612                 
613                 UndoClusterSupport support = new UndoClusterSupport(clusters);
614                 
615                 final int changeSetId = clusters.state.headChangeSetId;
616                 
617                 if(ClusterUpdateProcessorBase.DEBUG)
618                         System.err.println(" === BEGIN UNDO ===");
619                 
620                 for(int i=0;i<changeSetIds.length;i++) {
621                         final long id = changeSetIds[changeSetIds.length-1-i];
622                         ArrayList<String> ccss = clusters.getChanges(id);
623     
624                         for(int j=0;j<ccss.size();j++) {
625                                 try {
626                                         if(ClusterUpdateProcessorBase.DEBUG)
627                                                 System.err.println("performUndo " + ccss.get(ccss.size()-j-1));
628                                         performUndo(ccss.get(ccss.size()-j-1), clusterChanges, support);
629                                 } catch (DatabaseException e) {
630                                         e.printStackTrace();
631                                 }
632                         }
633                 }
634     
635                 if(ClusterUpdateProcessorBase.DEBUG)
636                         System.err.println(" === END UNDO ===");
637     
638                 for(int i=0;i<clusterChanges.size();i++) {
639                         
640                         final int changeSetIndex = i;
641                         
642                         final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
643                         
644                         final ClusterUID cuid = pair.first;
645                         final byte[] data = pair.second;
646     
647                         onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
648     
649                                 @Override
650                                 public long getChangeSetId() {
651                                         return changeSetId;
652                                 }
653     
654                                 @Override
655                                 public int getChangeSetIndex() {
656                                         return 0;
657                                 }
658     
659                                 @Override
660                                 public int getNumberOfClusterChangeSets() {
661                                         return clusterChanges.size();
662                                 }
663     
664                                 @Override
665                                 public int getIndexOfClusterChangeSet() {
666                                         return changeSetIndex;
667                                 }
668     
669                                 @Override
670                                 public byte[] getClusterId() {
671                                         return cuid.asBytes();
672                                 }
673     
674                                 @Override
675                                 public boolean getNewCluster() {
676                                         return false;
677                                 }
678     
679                                 @Override
680                                 public byte[] getData() {
681                                         return data;
682                                 }
683     
684                         });
685                 }
686         } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
687             throw new ProCoreException(e1);
688         }
689                 return false;
690         }
691         
692         public ServiceLocator getServiceLocator() {
693             return locator;
694         }
695
696     @Override
697     public boolean refreshEnabled() {
698         return false;
699     }
700
701     @Override
702     public boolean rolledback() {
703         return clusters.rolledback();
704     }
705
706         
707         
708         
709         
710         
711         
712         
713         
714         
715         
716         ////////////////////////
717         
718         
719         
720         
721         
722         
723         
724         
725         
726         
727         
728         
729 }
730