001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.plist; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.Map.Entry; 025import java.util.NoSuchElementException; 026import java.util.Set; 027import java.util.concurrent.atomic.AtomicBoolean; 028import java.util.concurrent.atomic.AtomicReference; 029 030import org.apache.activemq.management.SizeStatisticImpl; 031import org.apache.activemq.store.PList; 032import org.apache.activemq.store.PListEntry; 033import org.apache.activemq.store.kahadb.disk.index.ListIndex; 034import org.apache.activemq.store.kahadb.disk.index.ListNode; 035import org.apache.activemq.store.kahadb.disk.journal.Location; 036import org.apache.activemq.store.kahadb.disk.page.Transaction; 037import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 038import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 039import org.apache.activemq.util.ByteSequence; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043public class PListImpl extends ListIndex<String, Location> implements PList { 044 static final Logger LOG = LoggerFactory.getLogger(PListImpl.class); 045 final PListStoreImpl store; 046 private String name; 047 Object indexLock; 048 private final SizeStatisticImpl messageSize; 049 050 PListImpl(PListStoreImpl store) { 051 this.store = store; 052 this.indexLock = store.getIndexLock(); 053 setPageFile(store.getPageFile()); 054 setKeyMarshaller(StringMarshaller.INSTANCE); 055 setValueMarshaller(LocationMarshaller.INSTANCE); 056 057 messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages"); 058 messageSize.setEnabled(true); 059 } 060 061 public void setName(String name) { 062 this.name = name; 063 } 064 065 @Override 066 public String getName() { 067 return this.name; 068 } 069 070 void read(DataInput in) throws IOException { 071 setHeadPageId(in.readLong()); 072 } 073 074 public void write(DataOutput out) throws IOException { 075 out.writeLong(getHeadPageId()); 076 } 077 078 @Override 079 public synchronized void destroy() throws IOException { 080 synchronized (indexLock) { 081 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 082 @Override 083 public void execute(Transaction tx) throws IOException { 084 clear(tx); 085 unload(tx); 086 } 087 }); 088 } 089 } 090 091 class Locator { 092 final String id; 093 094 Locator(String id) { 095 this.id = id; 096 } 097 098 PListImpl plist() { 099 return PListImpl.this; 100 } 101 } 102 103 @Override 104 public Object addLast(final String id, final ByteSequence bs) throws IOException { 105 final Location location = this.store.write(bs, false); 106 synchronized (indexLock) { 107 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 108 @Override 109 public void execute(Transaction tx) throws IOException { 110 add(tx, id, location); 111 } 112 }); 113 } 114 return new Locator(id); 115 } 116 117 @Override 118 public Object addFirst(final String id, final ByteSequence bs) throws IOException { 119 final Location location = this.store.write(bs, false); 120 synchronized (indexLock) { 121 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 122 @Override 123 public void execute(Transaction tx) throws IOException { 124 addFirst(tx, id, location); 125 } 126 }); 127 } 128 return new Locator(id); 129 } 130 131 @Override 132 public boolean remove(final Object l) throws IOException { 133 Locator locator = (Locator) l; 134 assert locator!=null; 135 assert locator.plist()==this; 136 return remove(locator.id); 137 } 138 139 public boolean remove(final String id) throws IOException { 140 final AtomicBoolean result = new AtomicBoolean(); 141 synchronized (indexLock) { 142 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 143 @Override 144 public void execute(Transaction tx) throws IOException { 145 result.set(remove(tx, id) != null); 146 } 147 }); 148 } 149 return result.get(); 150 } 151 152 public boolean remove(final long position) throws IOException { 153 final AtomicBoolean result = new AtomicBoolean(); 154 synchronized (indexLock) { 155 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 156 @Override 157 public void execute(Transaction tx) throws IOException { 158 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position); 159 if (iterator.hasNext()) { 160 iterator.next(); 161 iterator.remove(); 162 result.set(true); 163 } else { 164 result.set(false); 165 } 166 } 167 }); 168 } 169 return result.get(); 170 } 171 172 public PListEntry get(final long position) throws IOException { 173 PListEntry result = null; 174 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); 175 synchronized (indexLock) { 176 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 177 @Override 178 public void execute(Transaction tx) throws IOException { 179 Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position); 180 ref.set(iterator.next()); 181 } 182 }); 183 } 184 if (ref.get() != null) { 185 ByteSequence bs = this.store.getPayload(ref.get().getValue()); 186 result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey())); 187 } 188 return result; 189 } 190 191 public PListEntry getFirst() throws IOException { 192 PListEntry result = null; 193 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); 194 synchronized (indexLock) { 195 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 196 @Override 197 public void execute(Transaction tx) throws IOException { 198 ref.set(getFirst(tx)); 199 } 200 }); 201 } 202 if (ref.get() != null) { 203 ByteSequence bs = this.store.getPayload(ref.get().getValue()); 204 result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey())); 205 } 206 return result; 207 } 208 209 public PListEntry getLast() throws IOException { 210 PListEntry result = null; 211 final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); 212 synchronized (indexLock) { 213 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 214 @Override 215 public void execute(Transaction tx) throws IOException { 216 ref.set(getLast(tx)); 217 } 218 }); 219 } 220 if (ref.get() != null) { 221 ByteSequence bs = this.store.getPayload(ref.get().getValue()); 222 result = new PListEntry(ref.get().getKey(), bs, new Locator(ref.get().getKey())); 223 } 224 return result; 225 } 226 227 @Override 228 public boolean isEmpty() { 229 return size() == 0; 230 } 231 232 @Override 233 public PListIterator iterator() throws IOException { 234 return new PListIteratorImpl(); 235 } 236 237 final class PListIteratorImpl implements PListIterator { 238 final Iterator<Map.Entry<String, Location>> iterator; 239 final Transaction tx; 240 241 PListIteratorImpl() throws IOException { 242 tx = store.pageFile.tx(); 243 synchronized (indexLock) { 244 this.iterator = iterator(tx); 245 } 246 } 247 248 @Override 249 public boolean hasNext() { 250 return iterator.hasNext(); 251 } 252 253 @Override 254 public PListEntry next() { 255 Map.Entry<String, Location> entry = iterator.next(); 256 ByteSequence bs = null; 257 try { 258 bs = store.getPayload(entry.getValue()); 259 } catch (IOException unexpected) { 260 NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage()); 261 e.initCause(unexpected); 262 throw e; 263 } 264 return new PListEntry(entry.getKey(), bs, new Locator(entry.getKey())); 265 } 266 267 @Override 268 public void remove() { 269 try { 270 synchronized (indexLock) { 271 tx.execute(new Transaction.Closure<IOException>() { 272 @Override 273 public void execute(Transaction tx) throws IOException { 274 iterator.remove(); 275 } 276 }); 277 } 278 } catch (IOException unexpected) { 279 IllegalStateException e = new IllegalStateException(unexpected); 280 e.initCause(unexpected); 281 throw e; 282 } 283 } 284 285 @Override 286 public void release() { 287 try { 288 tx.rollback(); 289 } catch (IOException unexpected) { 290 IllegalStateException e = new IllegalStateException(unexpected); 291 e.initCause(unexpected); 292 throw e; 293 } 294 } 295 } 296 297 public void claimFileLocations(final Set<Integer> candidates) throws IOException { 298 synchronized (indexLock) { 299 if (loaded.get()) { 300 this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { 301 @Override 302 public void execute(Transaction tx) throws IOException { 303 Iterator<Map.Entry<String,Location>> iterator = iterator(tx); 304 while (iterator.hasNext()) { 305 Location location = iterator.next().getValue(); 306 candidates.remove(location.getDataFileId()); 307 } 308 } 309 }); 310 } 311 } 312 } 313 314 @Override 315 public long messageSize() { 316 return messageSize.getTotalSize(); 317 } 318 319 @Override 320 public synchronized Location add(Transaction tx, String key, Location value) 321 throws IOException { 322 Location location = super.add(tx, key, value); 323 messageSize.addSize(value.getSize()); 324 return location; 325 } 326 327 @Override 328 public synchronized Location addFirst(Transaction tx, String key, 329 Location value) throws IOException { 330 Location location = super.addFirst(tx, key, value); 331 messageSize.addSize(value.getSize()); 332 return location; 333 } 334 335 @Override 336 public synchronized void clear(Transaction tx) throws IOException { 337 messageSize.reset(); 338 super.clear(tx); 339 } 340 341 @Override 342 protected synchronized void onLoad(ListNode<String, Location> node, Transaction tx) { 343 try { 344 Iterator<Entry<String, Location>> i = node.iterator(tx); 345 while (i.hasNext()) { 346 messageSize.addSize(i.next().getValue().getSize()); 347 } 348 } catch (IOException e) { 349 LOG.warn("could not increment message size", e); 350 } 351 } 352 353 @Override 354 public void onRemove(Entry<String, Location> removed) { 355 super.onRemove(removed); 356 if (removed != null) { 357 messageSize.addSize(-removed.getValue().getSize()); 358 } 359 } 360 361 @Override 362 public String toString() { 363 return name + "[headPageId=" + getHeadPageId() + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]"; 364 } 365}