]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
Make Write-interfaces as @FunctionalInterface for lambdas
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / GraphClientImpl2.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.acorn;
13
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23
24 import org.simantics.acorn.MainProgram.MainProgramRunnable;
25 import org.simantics.acorn.exception.AcornAccessVerificationException;
26 import org.simantics.acorn.exception.IllegalAcornStateException;
27 import org.simantics.acorn.internal.ClusterChange;
28 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
29 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
30 import org.simantics.acorn.lru.ClusterInfo;
31 import org.simantics.acorn.lru.ClusterStreamChunk;
32 import org.simantics.acorn.lru.ClusterUpdateOperation;
33 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
34 import org.simantics.db.ClusterCreator;
35 import org.simantics.db.Database;
36 import org.simantics.db.ServiceLocator;
37 import org.simantics.db.common.utils.Logger;
38 import org.simantics.db.exception.DatabaseException;
39 import org.simantics.db.exception.SDBException;
40 import org.simantics.db.server.ProCoreException;
41 import org.simantics.db.service.ClusterSetsSupport;
42 import org.simantics.db.service.ClusterUID;
43 import org.simantics.db.service.LifecycleSupport;
44 import org.simantics.utils.datastructures.Pair;
45 import org.simantics.utils.logging.TimeLogger;
46
47 import gnu.trove.map.hash.TLongObjectHashMap;
48
49 public class GraphClientImpl2 implements Database.Session {
50         
51         public static final boolean DEBUG = false;
52
53         public final ClusterManager clusters;
54         
55         private TransactionManager transactionManager = new TransactionManager();
56         private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
57         private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
58
59         private Path dbFolder;
60         private final Database database;
61         private ServiceLocator locator;
62         private MainProgram mainProgram;
63
64         static class ClientThreadFactory implements ThreadFactory {
65                 
66                 final String name;
67                 final boolean daemon;
68                 
69                 public ClientThreadFactory(String name, boolean daemon) {
70                         this.name = name;
71                         this.daemon = daemon;
72                 }
73                 
74                 @Override
75                 public Thread newThread(Runnable r) {
76                         Thread thread = new Thread(r, name);
77                         thread.setDaemon(daemon);
78                         return thread;
79                 }
80         }
81
82         public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
83             this.database = database;
84             this.dbFolder = dbFolder;
85             this.locator = locator;
86             this.clusters = new ClusterManager(dbFolder);
87             load();
88             ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
89             cssi.setReadDirectory(clusters.lastSessionDirectory);
90             cssi.updateWriteDirectory(clusters.workingDirectory);
91             mainProgram = new MainProgram(this, clusters);
92             executor.execute(mainProgram);
93         }
94
95         public Path getDbFolder() {
96             return dbFolder;
97         }
98
99         public void tryMakeSnapshot() throws IOException {
100                 
101             if (isClosing || unexpectedClose)
102                 return;
103             
104                 saver.execute(new Runnable() {
105
106                         @Override
107                         public void run() {
108                                 Transaction tr = null;
109                                 try {
110                                         // First take a write transaction
111                                         tr = askWriteTransaction(-1);
112                                         // Then make sure that MainProgram is idling
113                                         mainProgram.mutex.acquire();
114                                         try {
115                                                 synchronized(mainProgram) {
116                                                         if(mainProgram.operations.isEmpty()) {
117                                                                 makeSnapshot(false);
118                                                         } else {
119                                                                 // MainProgram is becoming busy again - delay snapshotting
120                                                                 return;
121                                                         }
122                                                 }
123                                         } finally {
124                                                 mainProgram.mutex.release();
125                                         }
126                                 } catch (IllegalAcornStateException | ProCoreException e) {
127                                         Logger.defaultLogError("Snapshotting failed", e);
128                                         unexpectedClose = true;
129                                 } catch (InterruptedException e) {
130                                         Logger.defaultLogError("Snapshotting interrupted", e);
131                                 } finally {
132                                         try {
133                                                 if(tr != null)
134                                                         endTransaction(tr.getTransactionId());
135                                                 if (unexpectedClose) {
136                                                 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
137                             try {
138                                 support.close();
139                             } catch (DatabaseException e1) {
140                                 Logger.defaultLogError("Failed to close database as a safety measure due to failed snapshotting", e1);
141                             }
142                                                 }
143                                         } catch (ProCoreException e) {
144                                                 Logger.defaultLogError("Failed to end snapshotting write transaction", e);
145                                         }
146                                 }
147                         }
148                 });
149         }
150         
151     public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
152         clusters.makeSnapshot(locator, fullSave);
153     }
154         
155         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
156             try {
157             return clusters.clone(uid, creator);
158         } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
159             unexpectedClose = true;
160             throw new DatabaseException(e);
161         }
162         }
163
164 //      private void save() throws IOException {
165 //              clusters.save();
166 //      }
167         
168         public void load() throws IOException {
169                 clusters.load();
170         }
171         
172 //      public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
173 //              clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
174 //      }
175
176         @Override
177         public Database getDatabase() {
178                 return database;
179         }
180
181         private boolean closed = false;
182         private boolean isClosing = false;
183         private boolean unexpectedClose = false;
184         
185         @Override
186         public void close() throws ProCoreException {
187             System.err.println("Closing " + this + " and mainProgram " + mainProgram);
188                 if(!closed && !isClosing) {
189                     isClosing = true;
190                         try {
191                             if (!unexpectedClose)
192                                 makeSnapshot(true);
193                                 
194                                 mainProgram.close();
195                                 clusters.shutdown();
196                                 executor.shutdown();
197                                 saver.shutdown();
198                                 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
199                                 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
200                                 
201                                 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
202                                 
203                                 mainProgram = null;
204                                 executor = null;
205                                 saver = null;
206                                 
207                         } catch (IllegalAcornStateException | InterruptedException e) {
208                                 throw new ProCoreException(e);
209                         }
210                         closed = true;
211                 }
212                 //impl.close();
213         }
214
215         @Override
216         public void open() throws ProCoreException {
217                 throw new UnsupportedOperationException();
218         }
219
220         @Override
221         public boolean isClosed() throws ProCoreException {
222                 return closed;
223         }
224         
225         @Override
226         public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
227                 clusters.state.headChangeSetId++;
228                 long committedChangeSetId = changeSetId + 1;
229                 try {
230                 clusters.commitChangeSet(committedChangeSetId, metadata);
231                 
232                 clusters.state.transactionId = transactionId;
233                 
234                 mainProgram.committed();
235                 
236                 TimeLogger.log("Accepted commit");
237                 } catch (IllegalAcornStateException e) {
238                     throw new ProCoreException(e);
239                 }
240         }
241
242         @Override
243         public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
244                 // Accept and finalize current transaction and then undo it
245                 acceptCommit(transactionId, changeSetId, metadata);
246
247                 try {
248                         undo(new long[] {changeSetId+1}, onChangeSetUpdate);
249                         clusters.state.headChangeSetId++;
250                         return clusters.state.headChangeSetId;
251                 } catch (SDBException e) {
252                         Logger.defaultLogError("Failed to undo cancelled transaction", e);
253                         throw new ProCoreException(e);
254                 }
255         }
256
257         @Override
258         public Transaction askReadTransaction() throws ProCoreException {
259                 return transactionManager.askReadTransaction();
260         }
261
262         enum TransactionState {
263                 IDLE,WRITE,READ
264         }
265         
266         class TransactionRequest {
267                 public TransactionState state;
268                 public Semaphore semaphore;
269                 public TransactionRequest(TransactionState state, Semaphore semaphore) {
270                         this.state = state;
271                         this.semaphore = semaphore;
272                 }
273         }
274
275         class TransactionManager {
276
277                 private TransactionState currentTransactionState = TransactionState.IDLE;
278                 
279                 private int reads = 0;
280                 
281                 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
282                 
283                 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
284                 
285                 private synchronized Transaction makeTransaction(TransactionRequest req) {
286                         
287                         final int csId = clusters.state.headChangeSetId;
288                         final long trId = clusters.state.transactionId+1;
289                         requestMap.put(trId, req);
290                         return new Transaction() {
291                                 
292                                 @Override
293                                 public long getTransactionId() {
294                                         return trId;
295                                 }
296                                 
297                                 @Override
298                                 public long getHeadChangeSetId() {
299                                         return csId;
300                                 }
301                         };
302                 }
303                 
304                 /*
305                  * This method cannot be synchronized since it waits and must support multiple entries
306                  * by query thread(s) and internal transactions such as snapshot saver
307                  */
308                 public Transaction askReadTransaction() throws ProCoreException {
309                 
310                         Semaphore semaphore = new Semaphore(0);
311                         
312                         TransactionRequest req = queue(TransactionState.READ, semaphore);
313                         
314                         try {
315                                 semaphore.acquire();
316                         } catch (InterruptedException e) {
317                                 throw new ProCoreException(e);
318                         }
319                         
320                         return makeTransaction(req);
321
322                 }
323
324                 private synchronized void dispatch() {
325                         TransactionRequest r = requests.removeFirst();
326                         if(r.state == TransactionState.READ) reads++;
327                         r.semaphore.release();
328                 }
329                 
330                 private synchronized void processRequests() {
331                         
332                         while(true) {
333
334                                 if(requests.isEmpty()) return;
335                                 TransactionRequest req = requests.peek();
336
337                                 if(currentTransactionState == TransactionState.IDLE) {
338                                 
339                                         // Accept anything while IDLE
340                                         currentTransactionState = req.state;
341                                         dispatch();
342                                         
343                                 } else if (currentTransactionState == TransactionState.READ) {
344                                         
345                                         if(req.state == currentTransactionState) {
346
347                                                 // Allow other reads
348                                                 dispatch();
349
350                                         } else {
351                                                 
352                                                 // Wait
353                                                 return;
354                                                 
355                                         }
356                                         
357                                 }  else if (currentTransactionState == TransactionState.WRITE) {
358
359                                         // Wait
360                                         return;
361                                         
362                                 }
363                                 
364                         }
365                         
366                 }
367                 
368                 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
369                         TransactionRequest req = new TransactionRequest(state, semaphore); 
370                         requests.addLast(req);
371                         processRequests();
372                         return req;
373                 }
374                 
375                 /*
376                  * This method cannot be synchronized since it waits and must support multiple entries
377                  * by query thread(s) and internal transactions such as snapshot saver
378                  */
379                 public Transaction askWriteTransaction() throws IllegalAcornStateException {
380                         
381                         Semaphore semaphore = new Semaphore(0);
382                         TransactionRequest req = queue(TransactionState.WRITE, semaphore);
383                         
384                         try {
385                                 semaphore.acquire();
386                         } catch (InterruptedException e) {
387                                 throw new IllegalAcornStateException(e);
388                         }
389                         mainProgram.startTransaction(clusters.state.headChangeSetId+1);
390                         return makeTransaction(req);
391                 }
392                 
393                 public synchronized long endTransaction(long transactionId) throws ProCoreException {
394                         
395                         TransactionRequest req = requestMap.remove(transactionId);
396                         if(req.state == TransactionState.WRITE) {
397                                 currentTransactionState = TransactionState.IDLE;
398                                 processRequests();
399                         } else {
400                                 reads--;
401                                 if(reads == 0) {
402                                         currentTransactionState = TransactionState.IDLE;
403                                         processRequests();
404                                 }
405                         }
406                         return clusters.state.transactionId;
407                 }
408
409         }
410         
411         @Override
412         public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
413                 try {
414                     if (isClosing || unexpectedClose || closed) {
415                         throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
416                     }
417             return transactionManager.askWriteTransaction();
418         } catch (IllegalAcornStateException e) {
419             throw new ProCoreException(e);
420         }
421         }
422
423         @Override
424         public long endTransaction(long transactionId) throws ProCoreException {
425                 return transactionManager.endTransaction(transactionId);
426         }
427
428         @Override
429         public String execute(String command) throws ProCoreException {
430                 // This is called only by WriteGraphImpl.commitAccessorChanges
431                 // We can ignore this in Acorn
432                 return "";
433         }
434
435         @Override
436         public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
437                 try {
438             return clusters.getMetadata(changeSetId);
439         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
440             throw new ProCoreException(e);
441         }
442         }
443
444         @Override
445         public ChangeSetData getChangeSetData(long minChangeSetId,
446                         long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
447                         throws ProCoreException {
448                 
449                 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
450                 return null;
451                 
452         }
453
454         @Override
455         public ChangeSetIds getChangeSetIds() throws ProCoreException {
456                 throw new UnsupportedOperationException();
457         }
458
459         @Override
460         public Cluster getCluster(byte[] clusterId) throws ProCoreException {
461                 throw new UnsupportedOperationException();
462         }
463
464         @Override
465         public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
466                         throws ProCoreException {
467                 throw new UnsupportedOperationException();
468         }
469
470         @Override
471         public ClusterIds getClusterIds() throws ProCoreException {
472                 try {
473             return clusters.getClusterIds();
474         } catch (IllegalAcornStateException e) {
475             throw new ProCoreException(e);
476         }
477         }
478
479         @Override
480         public Information getInformation() throws ProCoreException {
481                 return new Information() {
482
483                         @Override
484                         public String getServerId() {
485                                 return "server";
486                         }
487
488                         @Override
489                         public String getProtocolId() {
490                                 return "";
491                         }
492
493                         @Override
494                         public String getDatabaseId() {
495                                 return "database";
496                         }
497
498                         @Override
499                         public long getFirstChangeSetId() {
500                                 return 0;
501                         }
502                         
503                 };
504         }
505
506         @Override
507         public Refresh getRefresh(long changeSetId) throws ProCoreException {
508                 
509                 final ClusterIds ids = getClusterIds();
510                 
511                 return new Refresh() {
512
513                         @Override
514                         public long getHeadChangeSetId() {
515                                 return clusters.state.headChangeSetId;
516                         }
517
518                         @Override
519                         public long[] getFirst() {
520                                 return ids.getFirst();
521                         }
522
523                         @Override
524                         public long[] getSecond() {
525                                 return ids.getSecond();
526                         }
527                         
528                 };
529                 
530         }
531
532         public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
533                 return clusters.getResourceFile(clusterUID, resourceIndex);
534         }
535
536         @Override
537         public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
538                 try {
539             return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
540         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
541             throw new ProCoreException(e);
542         }
543         }
544
545         @Override
546         public long reserveIds(int count) throws ProCoreException {
547                 return clusters.state.reservedIds++;
548         }
549
550         @Override
551         public void updateCluster(byte[] operations) throws ProCoreException {
552             ClusterInfo info = null;
553             try {
554                 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
555                 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
556                 if(info == null)
557                     throw new IllegalAcornStateException("info == null for operation " + operation);
558                 info.acquireMutex();
559                         info.scheduleUpdate();
560                         mainProgram.schedule(operation);
561                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
562             throw new ProCoreException(e);
563         } finally {
564             if (info != null)
565                 info.releaseMutex();
566                 }
567         }
568
569         private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
570
571                 String[] ss = ccsId.split("\\.");
572                 String chunkKey = ss[0];
573                 int chunkOffset = Integer.parseInt(ss[1]);
574                 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
575                 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
576                 chunk.acquireMutex();
577                 try {
578                         return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
579                 } catch (DatabaseException e) {
580                     throw e;
581                 } catch (Throwable t) {
582                         throw new IllegalStateException(t);
583                 } finally {
584                         chunk.releaseMutex();
585                 }
586         }
587         
588         private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
589                 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
590
591                 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
592
593                 clusters.clusterLRU.acquireMutex();
594                 try {
595
596                         ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
597                         for(int i=0;i<proc.entries.size();i++) {
598                                 
599                                 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
600                                 e.process(clusters, cs, clusterKey);
601                         }
602                         cs.flush();
603
604                 } finally {
605                         clusters.clusterLRU.releaseMutex();
606                 }
607         }
608
609         @Override
610         public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
611
612                 Exception exception = mainProgram.runIdle(new MainProgramRunnable() {
613
614                         @Override
615                         public void run() throws Exception {
616
617                                 try {
618
619                                 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
620                                 
621                                 UndoClusterSupport support = new UndoClusterSupport(clusters);
622                                 
623                                 final int changeSetId = clusters.state.headChangeSetId;
624                                 
625                                 if(ClusterUpdateProcessorBase.DEBUG)
626                                         System.err.println(" === BEGIN UNDO ===");
627                                 
628                                 for(int i=0;i<changeSetIds.length;i++) {
629                                         final long id = changeSetIds[changeSetIds.length-1-i];
630                                         ArrayList<String> ccss = clusters.getChanges(id);
631                     
632                                         for(int j=0;j<ccss.size();j++) {
633                                                 String ccsid = ccss.get(ccss.size()-j-1);
634                                                 try {
635                                                         if(ClusterUpdateProcessorBase.DEBUG)
636                                                                 System.err.println("performUndo " + ccsid);
637                                                         performUndo(ccsid, clusterChanges, support);
638                                                 } catch (DatabaseException e) {
639                                                         e.printStackTrace();
640                                                 }
641                                         }
642                                 }
643                     
644                                 if(ClusterUpdateProcessorBase.DEBUG)
645                                         System.err.println(" === END UNDO ===");
646                     
647                                 for(int i=0;i<clusterChanges.size();i++) {
648                                         
649                                         final int changeSetIndex = i;
650                                         
651                                         final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
652                                         
653                                         final ClusterUID cuid = pair.first;
654                                         final byte[] data = pair.second;
655                     
656                                         onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
657                     
658                                                 @Override
659                                                 public long getChangeSetId() {
660                                                         return changeSetId;
661                                                 }
662                     
663                                                 @Override
664                                                 public int getChangeSetIndex() {
665                                                         return 0;
666                                                 }
667                     
668                                                 @Override
669                                                 public int getNumberOfClusterChangeSets() {
670                                                         return clusterChanges.size();
671                                                 }
672                     
673                                                 @Override
674                                                 public int getIndexOfClusterChangeSet() {
675                                                         return changeSetIndex;
676                                                 }
677                     
678                                                 @Override
679                                                 public byte[] getClusterId() {
680                                                         return cuid.asBytes();
681                                                 }
682                     
683                                                 @Override
684                                                 public boolean getNewCluster() {
685                                                         return false;
686                                                 }
687                     
688                                                 @Override
689                                                 public byte[] getData() {
690                                                         return data;
691                                                 }
692                     
693                                         });
694                                 }
695                         } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
696                             throw new ProCoreException(e1);
697                         }
698
699                         }
700
701                         @Override
702                         public void done() {
703
704                         }
705
706                 });
707
708                 if(exception instanceof SDBException) throw (SDBException)exception;
709                 else if(exception != null) throw new IllegalAcornStateException(exception);
710                 
711                 return false;
712                 
713         }
714         
715         public ServiceLocator getServiceLocator() {
716             return locator;
717         }
718
719     @Override
720     public boolean refreshEnabled() {
721         return false;
722     }
723
724     @Override
725     public boolean rolledback() {
726         return clusters.rolledback();
727     }
728
729         
730         
731         
732         
733         
734         
735         
736         
737         
738         
739         ////////////////////////
740         
741         
742         
743         
744         
745         
746         
747         
748         
749         
750         
751         
752 }
753