]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/GraphClientImpl2.java
Merge "Cluster sets are written to wrong directory until first snapshot is made"
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / GraphClientImpl2.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.acorn;
13
14 import java.io.IOException;
15 import java.nio.file.Path;
16 import java.util.ArrayList;
17 import java.util.LinkedList;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.Semaphore;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23
24 import org.simantics.acorn.exception.AcornAccessVerificationException;
25 import org.simantics.acorn.exception.IllegalAcornStateException;
26 import org.simantics.acorn.internal.ClusterChange;
27 import org.simantics.acorn.internal.ClusterUpdateProcessorBase;
28 import org.simantics.acorn.internal.UndoClusterUpdateProcessor;
29 import org.simantics.acorn.lru.ClusterInfo;
30 import org.simantics.acorn.lru.ClusterStreamChunk;
31 import org.simantics.acorn.lru.ClusterUpdateOperation;
32 import org.simantics.acorn.lru.ClusterChangeSet.Entry;
33 import org.simantics.db.ClusterCreator;
34 import org.simantics.db.Database;
35 import org.simantics.db.ServiceLocator;
36 import org.simantics.db.common.utils.Logger;
37 import org.simantics.db.exception.DatabaseException;
38 import org.simantics.db.exception.SDBException;
39 import org.simantics.db.server.ProCoreException;
40 import org.simantics.db.service.ClusterSetsSupport;
41 import org.simantics.db.service.ClusterUID;
42 import org.simantics.db.service.LifecycleSupport;
43 import org.simantics.utils.datastructures.Pair;
44 import org.simantics.utils.logging.TimeLogger;
45
46 import gnu.trove.map.hash.TLongObjectHashMap;
47
48 public class GraphClientImpl2 implements Database.Session {
49         
50         public static final boolean DEBUG = false;
51
52         public final ClusterManager clusters;
53         
54         private TransactionManager transactionManager = new TransactionManager();
55         private ExecutorService executor = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Main Program", false));
56         private ExecutorService saver = Executors.newSingleThreadExecutor(new ClientThreadFactory("Core Snapshot Saver", true));
57
58         private Path dbFolder;
59         private final Database database;
60         private ServiceLocator locator;
61         private MainProgram mainProgram;
62
63         static class ClientThreadFactory implements ThreadFactory {
64                 
65                 final String name;
66                 final boolean daemon;
67                 
68                 public ClientThreadFactory(String name, boolean daemon) {
69                         this.name = name;
70                         this.daemon = daemon;
71                 }
72                 
73                 @Override
74                 public Thread newThread(Runnable r) {
75                         Thread thread = new Thread(r, name);
76                         thread.setDaemon(daemon);
77                         return thread;
78                 }
79         }
80
81         public GraphClientImpl2(Database database, Path dbFolder, ServiceLocator locator) throws IOException {
82             this.database = database;
83             this.dbFolder = dbFolder;
84             this.locator = locator;
85             this.clusters = new ClusterManager(dbFolder);
86             load();
87             ClusterSetsSupport cssi = locator.getService(ClusterSetsSupport.class); 
88             cssi.setReadDirectory(clusters.lastSessionDirectory);
89             cssi.updateWriteDirectory(clusters.workingDirectory);
90             mainProgram = new MainProgram(this, clusters);
91             executor.execute(mainProgram);
92         }
93
94         public Path getDbFolder() {
95             return dbFolder;
96         }
97
98         public void tryMakeSnapshot() throws IOException {
99                 
100             if (isClosing || unexpectedClose)
101                 return;
102             
103                 saver.execute(new Runnable() {
104
105                         @Override
106                         public void run() {
107                                 Transaction tr = null;
108                                 try {
109                                         // First take a write transaction
110                                         tr = askWriteTransaction(-1);
111                                         // Then make sure that MainProgram is idling
112                                         mainProgram.mutex.acquire();
113                                         try {
114                                                 synchronized(mainProgram) {
115                                                         if(mainProgram.operations.isEmpty()) {
116                                                                 makeSnapshot(false);
117                                                         } else {
118                                                                 // MainProgram is becoming busy again - delay snapshotting
119                                                                 return;
120                                                         }
121                                                 }
122                                         } finally {
123                                                 mainProgram.mutex.release();
124                                         }
125                                 } catch (IllegalAcornStateException | ProCoreException e) {
126                                         Logger.defaultLogError("Snapshotting failed", e);
127                                         unexpectedClose = true;
128                                 } catch (InterruptedException e) {
129                                         Logger.defaultLogError("Snapshotting interrupted", e);
130                                 } finally {
131                                         try {
132                                                 if(tr != null)
133                                                         endTransaction(tr.getTransactionId());
134                                                 if (unexpectedClose) {
135                                                 LifecycleSupport support = getServiceLocator().getService(LifecycleSupport.class);
136                             try {
137                                 support.close();
138                             } catch (DatabaseException e1) {
139                                 Logger.defaultLogError("Failed to close database as a safety measure due to failed snapshotting", e1);
140                             }
141                                                 }
142                                         } catch (ProCoreException e) {
143                                                 Logger.defaultLogError("Failed to end snapshotting write transaction", e);
144                                         }
145                                 }
146                         }
147                 });
148         }
149         
150     public void makeSnapshot(boolean fullSave) throws IllegalAcornStateException {
151         clusters.makeSnapshot(locator, fullSave);
152     }
153         
154         public <T> T clone(ClusterUID uid, ClusterCreator creator) throws DatabaseException {
155             try {
156             return clusters.clone(uid, creator);
157         } catch (AcornAccessVerificationException | IllegalAcornStateException | IOException e) {
158             unexpectedClose = true;
159             throw new DatabaseException(e);
160         }
161         }
162
163 //      private void save() throws IOException {
164 //              clusters.save();
165 //      }
166         
167         public void load() throws IOException {
168                 clusters.load();
169         }
170         
171 //      public void modiFileEx(ClusterUID uid, int resourceKey, long offset, long size, byte[] bytes, long pos, ClusterSupport support) {
172 //              clusters.modiFileEx(uid, resourceKey, offset, size, bytes, pos, support);
173 //      }
174
175         @Override
176         public Database getDatabase() {
177                 return database;
178         }
179
180         private boolean closed = false;
181         private boolean isClosing = false;
182         private boolean unexpectedClose = false;
183         
184         @Override
185         public void close() throws ProCoreException {
186             System.err.println("Closing " + this + " and mainProgram " + mainProgram);
187                 if(!closed && !isClosing) {
188                     isClosing = true;
189                         try {
190                             if (!unexpectedClose)
191                                 makeSnapshot(true);
192                                 
193                                 mainProgram.close();
194                                 clusters.shutdown();
195                                 executor.shutdown();
196                                 saver.shutdown();
197                                 boolean executorTerminated = executor.awaitTermination(500, TimeUnit.MILLISECONDS);
198                                 boolean saverTerminated = saver.awaitTermination(500, TimeUnit.MILLISECONDS);
199                                 
200                                 System.err.println("executorTerminated=" + executorTerminated + ", saverTerminated=" + saverTerminated);
201                                 
202                                 mainProgram = null;
203                                 executor = null;
204                                 saver = null;
205                                 
206                         } catch (IllegalAcornStateException | InterruptedException e) {
207                                 throw new ProCoreException(e);
208                         }
209                         closed = true;
210                 }
211                 //impl.close();
212         }
213
214         @Override
215         public void open() throws ProCoreException {
216                 throw new UnsupportedOperationException();
217         }
218
219         @Override
220         public boolean isClosed() throws ProCoreException {
221                 return closed;
222         }
223         
224         @Override
225         public void acceptCommit(long transactionId, long changeSetId, byte[] metadata) throws ProCoreException {
226                 clusters.state.headChangeSetId++;
227                 long committedChangeSetId = changeSetId + 1;
228                 try {
229                 clusters.commitChangeSet(committedChangeSetId, metadata);
230                 
231                 clusters.state.transactionId = transactionId;
232                 
233                 mainProgram.committed();
234                 
235                 TimeLogger.log("Accepted commit");
236                 } catch (IllegalAcornStateException e) {
237                     throw new ProCoreException(e);
238                 }
239         }
240
241         @Override
242         public long cancelCommit(long transactionId, long changeSetId, byte[] metadata, OnChangeSetUpdate onChangeSetUpdate) throws ProCoreException {
243                 // Accept and finalize current transaction and then undo it
244                 acceptCommit(transactionId, changeSetId, metadata);
245
246                 try {
247                         undo(new long[] {changeSetId+1}, onChangeSetUpdate);
248                         clusters.state.headChangeSetId++;
249                         return clusters.state.headChangeSetId;
250                 } catch (SDBException e) {
251                         Logger.defaultLogError("Failed to undo cancelled transaction", e);
252                         throw new ProCoreException(e);
253                 }
254         }
255
256         @Override
257         public Transaction askReadTransaction() throws ProCoreException {
258                 return transactionManager.askReadTransaction();
259         }
260
261         enum TransactionState {
262                 IDLE,WRITE,READ
263         }
264         
265         class TransactionRequest {
266                 public TransactionState state;
267                 public Semaphore semaphore;
268                 public TransactionRequest(TransactionState state, Semaphore semaphore) {
269                         this.state = state;
270                         this.semaphore = semaphore;
271                 }
272         }
273
274         class TransactionManager {
275
276                 private TransactionState currentTransactionState = TransactionState.IDLE;
277                 
278                 private int reads = 0;
279                 
280                 LinkedList<TransactionRequest> requests = new LinkedList<TransactionRequest>();
281                 
282                 TLongObjectHashMap<TransactionRequest> requestMap = new TLongObjectHashMap<TransactionRequest>();
283                 
284                 private synchronized Transaction makeTransaction(TransactionRequest req) {
285                         
286                         final int csId = clusters.state.headChangeSetId;
287                         final long trId = clusters.state.transactionId+1;
288                         requestMap.put(trId, req);
289                         return new Transaction() {
290                                 
291                                 @Override
292                                 public long getTransactionId() {
293                                         return trId;
294                                 }
295                                 
296                                 @Override
297                                 public long getHeadChangeSetId() {
298                                         return csId;
299                                 }
300                         };
301                 }
302                 
303                 /*
304                  * This method cannot be synchronized since it waits and must support multiple entries
305                  * by query thread(s) and internal transactions such as snapshot saver
306                  */
307                 public Transaction askReadTransaction() throws ProCoreException {
308                 
309                         Semaphore semaphore = new Semaphore(0);
310                         
311                         TransactionRequest req = queue(TransactionState.READ, semaphore);
312                         
313                         try {
314                                 semaphore.acquire();
315                         } catch (InterruptedException e) {
316                                 throw new ProCoreException(e);
317                         }
318                         
319                         return makeTransaction(req);
320
321                 }
322
323                 private synchronized void dispatch() {
324                         TransactionRequest r = requests.removeFirst();
325                         if(r.state == TransactionState.READ) reads++;
326                         r.semaphore.release();
327                 }
328                 
329                 private synchronized void processRequests() {
330                         
331                         while(true) {
332
333                                 if(requests.isEmpty()) return;
334                                 TransactionRequest req = requests.peek();
335
336                                 if(currentTransactionState == TransactionState.IDLE) {
337                                 
338                                         // Accept anything while IDLE
339                                         currentTransactionState = req.state;
340                                         dispatch();
341                                         
342                                 } else if (currentTransactionState == TransactionState.READ) {
343                                         
344                                         if(req.state == currentTransactionState) {
345
346                                                 // Allow other reads
347                                                 dispatch();
348
349                                         } else {
350                                                 
351                                                 // Wait
352                                                 return;
353                                                 
354                                         }
355                                         
356                                 }  else if (currentTransactionState == TransactionState.WRITE) {
357
358                                         // Wait
359                                         return;
360                                         
361                                 }
362                                 
363                         }
364                         
365                 }
366                 
367                 private synchronized TransactionRequest queue(TransactionState state, Semaphore semaphore) {
368                         TransactionRequest req = new TransactionRequest(state, semaphore); 
369                         requests.addLast(req);
370                         processRequests();
371                         return req;
372                 }
373                 
374                 /*
375                  * This method cannot be synchronized since it waits and must support multiple entries
376                  * by query thread(s) and internal transactions such as snapshot saver
377                  */
378                 public Transaction askWriteTransaction() throws IllegalAcornStateException {
379                         
380                         Semaphore semaphore = new Semaphore(0);
381                         TransactionRequest req = queue(TransactionState.WRITE, semaphore);
382                         
383                         try {
384                                 semaphore.acquire();
385                         } catch (InterruptedException e) {
386                                 throw new IllegalAcornStateException(e);
387                         }
388                         mainProgram.startTransaction(clusters.state.headChangeSetId+1);
389                         return makeTransaction(req);
390                 }
391                 
392                 public synchronized long endTransaction(long transactionId) throws ProCoreException {
393                         
394                         TransactionRequest req = requestMap.remove(transactionId);
395                         if(req.state == TransactionState.WRITE) {
396                                 currentTransactionState = TransactionState.IDLE;
397                                 processRequests();
398                         } else {
399                                 reads--;
400                                 if(reads == 0) {
401                                         currentTransactionState = TransactionState.IDLE;
402                                         processRequests();
403                                 }
404                         }
405                         return clusters.state.transactionId;
406                 }
407
408         }
409         
410         @Override
411         public Transaction askWriteTransaction(final long transactionId) throws ProCoreException {
412                 try {
413                     if (isClosing || unexpectedClose || closed) {
414                         throw new ProCoreException("GraphClientImpl2 is already closing so no more write transactions allowed!");
415                     }
416             return transactionManager.askWriteTransaction();
417         } catch (IllegalAcornStateException e) {
418             throw new ProCoreException(e);
419         }
420         }
421
422         @Override
423         public long endTransaction(long transactionId) throws ProCoreException {
424                 return transactionManager.endTransaction(transactionId);
425         }
426
427         @Override
428         public String execute(String command) throws ProCoreException {
429                 // This is called only by WriteGraphImpl.commitAccessorChanges
430                 // We can ignore this in Acorn
431                 return "";
432         }
433
434         @Override
435         public byte[] getChangeSetMetadata(long changeSetId) throws ProCoreException {
436                 try {
437             return clusters.getMetadata(changeSetId);
438         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
439             throw new ProCoreException(e);
440         }
441         }
442
443         @Override
444         public ChangeSetData getChangeSetData(long minChangeSetId,
445                         long maxChangeSetId, OnChangeSetUpdate onChangeSetupate)
446                         throws ProCoreException {
447                 
448                 new Exception("GetChangeSetDataFunction " + minChangeSetId + " " + maxChangeSetId).printStackTrace();;
449                 return null;
450                 
451         }
452
453         @Override
454         public ChangeSetIds getChangeSetIds() throws ProCoreException {
455                 throw new UnsupportedOperationException();
456         }
457
458         @Override
459         public Cluster getCluster(byte[] clusterId) throws ProCoreException {
460                 throw new UnsupportedOperationException();
461         }
462
463         @Override
464         public ClusterChanges getClusterChanges(long changeSetId, byte[] clusterId)
465                         throws ProCoreException {
466                 throw new UnsupportedOperationException();
467         }
468
469         @Override
470         public ClusterIds getClusterIds() throws ProCoreException {
471                 try {
472             return clusters.getClusterIds();
473         } catch (IllegalAcornStateException e) {
474             throw new ProCoreException(e);
475         }
476         }
477
478         @Override
479         public Information getInformation() throws ProCoreException {
480                 return new Information() {
481
482                         @Override
483                         public String getServerId() {
484                                 return "server";
485                         }
486
487                         @Override
488                         public String getProtocolId() {
489                                 return "";
490                         }
491
492                         @Override
493                         public String getDatabaseId() {
494                                 return "database";
495                         }
496
497                         @Override
498                         public long getFirstChangeSetId() {
499                                 return 0;
500                         }
501                         
502                 };
503         }
504
505         @Override
506         public Refresh getRefresh(long changeSetId) throws ProCoreException {
507                 
508                 final ClusterIds ids = getClusterIds();
509                 
510                 return new Refresh() {
511
512                         @Override
513                         public long getHeadChangeSetId() {
514                                 return clusters.state.headChangeSetId;
515                         }
516
517                         @Override
518                         public long[] getFirst() {
519                                 return ids.getFirst();
520                         }
521
522                         @Override
523                         public long[] getSecond() {
524                                 return ids.getSecond();
525                         }
526                         
527                 };
528                 
529         }
530
531         public byte[] getResourceFile(final byte[] clusterUID, final int resourceIndex) throws ProCoreException, AcornAccessVerificationException, IllegalAcornStateException {
532                 return clusters.getResourceFile(clusterUID, resourceIndex);
533         }
534
535         @Override
536         public ResourceSegment getResourceSegment(final byte[] clusterUID, final int resourceIndex, final long segmentOffset, short segmentSize) throws ProCoreException {
537                 try {
538             return clusters.getResourceSegment(clusterUID, resourceIndex, segmentOffset, segmentSize);
539         } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
540             throw new ProCoreException(e);
541         }
542         }
543
544         @Override
545         public long reserveIds(int count) throws ProCoreException {
546                 return clusters.state.reservedIds++;
547         }
548
549         @Override
550         public void updateCluster(byte[] operations) throws ProCoreException {
551             ClusterInfo info = null;
552             try {
553                 ClusterUpdateOperation operation = new ClusterUpdateOperation(clusters, operations);
554                 info = clusters.clusterLRU.getOrCreate(operation.uid, true);
555                 if(info == null)
556                     throw new IllegalAcornStateException("info == null for operation " + operation);
557                 info.acquireMutex();
558                         info.scheduleUpdate();
559                         mainProgram.schedule(operation);
560                 } catch (IllegalAcornStateException | AcornAccessVerificationException e) {
561             throw new ProCoreException(e);
562         } finally {
563             if (info != null)
564                 info.releaseMutex();
565                 }
566         }
567
568         private UndoClusterUpdateProcessor getUndoCSS(String ccsId) throws DatabaseException, AcornAccessVerificationException, IllegalAcornStateException {
569
570                 String[] ss = ccsId.split("\\.");
571                 String chunkKey = ss[0];
572                 int chunkOffset = Integer.parseInt(ss[1]);
573                 ClusterStreamChunk chunk = clusters.streamLRU.getWithoutMutex(chunkKey);
574                 if(chunk == null) throw new IllegalAcornStateException("Cluster Stream Chunk " + chunkKey + " was not found.");
575                 chunk.acquireMutex();
576                 try {
577                         return chunk.getUndoProcessor(clusters, chunkOffset, ccsId);
578                 } catch (DatabaseException e) {
579                     throw e;
580                 } catch (Throwable t) {
581                         throw new IllegalStateException(t);
582                 } finally {
583                         chunk.releaseMutex();
584                 }
585         }
586         
587         private void performUndo(String ccsId, ArrayList<Pair<ClusterUID, byte[]>> clusterChanges, UndoClusterSupport support) throws ProCoreException, DatabaseException, IllegalAcornStateException, AcornAccessVerificationException {
588                 UndoClusterUpdateProcessor proc = getUndoCSS(ccsId);
589
590                 int clusterKey = clusters.getClusterKeyByClusterUIDOrMakeWithoutMutex(proc.getClusterUID());
591
592                 clusters.clusterLRU.acquireMutex();
593                 try {
594
595                         ClusterChange cs = new ClusterChange(clusterChanges, proc.getClusterUID());
596                         for(int i=0;i<proc.entries.size();i++) {
597                                 
598                                 Entry e = proc.entries.get(proc.entries.size() - 1 - i);
599                                 e.process(clusters, cs, clusterKey);
600                         }
601                         cs.flush();
602
603                 } finally {
604                         clusters.clusterLRU.releaseMutex();
605                 }
606         }
607         
608         @Override
609         public boolean undo(long[] changeSetIds, OnChangeSetUpdate onChangeSetUpdate) throws SDBException {
610             try {
611                 final ArrayList<Pair<ClusterUID, byte[]>> clusterChanges = new ArrayList<Pair<ClusterUID, byte[]>>();
612                 
613                 UndoClusterSupport support = new UndoClusterSupport(clusters);
614                 
615                 final int changeSetId = clusters.state.headChangeSetId;
616                 
617                 if(ClusterUpdateProcessorBase.DEBUG)
618                         System.err.println(" === BEGIN UNDO ===");
619                 
620                 for(int i=0;i<changeSetIds.length;i++) {
621                         final long id = changeSetIds[changeSetIds.length-1-i];
622                         ArrayList<String> ccss = clusters.getChanges(id);
623     
624                         for(int j=0;j<ccss.size();j++) {
625                                 String ccsid = ccss.get(ccss.size()-j-1);
626                                 try {
627                                         if(ClusterUpdateProcessorBase.DEBUG)
628                                                 System.err.println("performUndo " + ccsid);
629                                         performUndo(ccsid, clusterChanges, support);
630                                 } catch (DatabaseException e) {
631                                         e.printStackTrace();
632                                 }
633                         }
634                 }
635     
636                 if(ClusterUpdateProcessorBase.DEBUG)
637                         System.err.println(" === END UNDO ===");
638     
639                 for(int i=0;i<clusterChanges.size();i++) {
640                         
641                         final int changeSetIndex = i;
642                         
643                         final Pair<ClusterUID, byte[]> pair = clusterChanges.get(i);
644                         
645                         final ClusterUID cuid = pair.first;
646                         final byte[] data = pair.second;
647     
648                         onChangeSetUpdate.onChangeSetUpdate(new ChangeSetUpdate() {
649     
650                                 @Override
651                                 public long getChangeSetId() {
652                                         return changeSetId;
653                                 }
654     
655                                 @Override
656                                 public int getChangeSetIndex() {
657                                         return 0;
658                                 }
659     
660                                 @Override
661                                 public int getNumberOfClusterChangeSets() {
662                                         return clusterChanges.size();
663                                 }
664     
665                                 @Override
666                                 public int getIndexOfClusterChangeSet() {
667                                         return changeSetIndex;
668                                 }
669     
670                                 @Override
671                                 public byte[] getClusterId() {
672                                         return cuid.asBytes();
673                                 }
674     
675                                 @Override
676                                 public boolean getNewCluster() {
677                                         return false;
678                                 }
679     
680                                 @Override
681                                 public byte[] getData() {
682                                         return data;
683                                 }
684     
685                         });
686                 }
687         } catch (AcornAccessVerificationException | IllegalAcornStateException e1) {
688             throw new ProCoreException(e1);
689         }
690                 return false;
691         }
692         
693         public ServiceLocator getServiceLocator() {
694             return locator;
695         }
696
697     @Override
698     public boolean refreshEnabled() {
699         return false;
700     }
701
702     @Override
703     public boolean rolledback() {
704         return clusters.rolledback();
705     }
706
707         
708         
709         
710         
711         
712         
713         
714         
715         
716         
717         ////////////////////////
718         
719         
720         
721         
722         
723         
724         
725         
726         
727         
728         
729         
730 }
731