1: <?php
2: /**
3: * Copyright 2012-2014 Rackspace US, Inc.
4: *
5: * Licensed under the Apache License, Version 2.0 (the "License");
6: * you may not use this file except in compliance with the License.
7: * You may obtain a copy of the License at
8: *
9: * http://www.apache.org/licenses/LICENSE-2.0
10: *
11: * Unless required by applicable law or agreed to in writing, software
12: * distributed under the License is distributed on an "AS IS" BASIS,
13: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14: * See the License for the specific language governing permissions and
15: * limitations under the License.
16: */
17:
18: namespace OpenCloud\Queues\Resource;
19:
20: use Guzzle\Http\Url;
21: use OpenCloud\Common\Collection\PaginatedIterator;
22: use OpenCloud\Common\Http\Message\Formatter;
23: use OpenCloud\Common\Resource\PersistentResource;
24: use OpenCloud\Queues\Exception;
25: use OpenCloud\Queues\Collection\MessageIterator;
26: use OpenCloud\Common\Metadata;
27:
28: /**
29: * A queue holds messages. Ideally, a queue is created per work type. For example,
30: * if you want to compress files, you would create a queue dedicated to this job.
31: * Any application that reads from this queue would only compress files.
32: */
33: class Queue extends PersistentResource
34: {
35: /**
36: * Maximum number of messages that can be posted at once.
37: */
38: const MAX_POST_MESSAGES = 10;
39:
40: /**
41: * The name given to the queue. The name MUST NOT exceed 64 bytes in length,
42: * and is limited to US-ASCII letters, digits, underscores, and hyphens.
43: *
44: * @var string
45: */
46: private $name;
47:
48: /**
49: * Miscellaneous, user-defined information about the queue.
50: *
51: * @var array|Metadata
52: */
53: protected $metadata;
54:
55: /**
56: * Populated when the service's listQueues() method is called. Provides a
57: * convenient link for a particular Queue.md.
58: *
59: * @var string
60: */
61: private $href;
62:
63: protected static $url_resource = 'queues';
64: protected static $json_collection_name = 'queues';
65: protected static $json_name = false;
66:
67: public $createKeys = array('name');
68:
69: /**
70: * Set the name (with validation).
71: *
72: * @param $name string
73: * @return $this
74: * @throws \OpenCloud\Queues\Exception\QueueException
75: */
76: public function setName($name)
77: {
78: if (preg_match('#[^\w\d\-\_]+#', $name)) {
79: throw new Exception\QueueException(sprintf(
80: 'Queues names are restricted to alphanumeric characters, '
81: . ' hyphens and underscores. You provided: %s',
82: print_r($name, true)
83: ));
84: }
85:
86: $this->name = $name;
87:
88: return $this;
89: }
90:
91: /**
92: * @return string
93: */
94: public function getName()
95: {
96: return $this->name;
97: }
98:
99: /**
100: * Save this metadata both to the local object and the API.
101: *
102: * @param array $params
103: * @return mixed
104: */
105: public function saveMetadata(array $params = array())
106: {
107: if (!empty($params)) {
108: $this->setMetadata($params);
109: }
110:
111: $json = json_encode((object) $this->getMetadata()->toArray());
112:
113: return $this->getClient()->put($this->getUrl('metadata'), self::getJsonHeader(), $json)->send();
114: }
115:
116: /**
117: * Retrieve metadata from the API and set it to the local object.
118: */
119: public function retrieveMetadata()
120: {
121: $response = $this->getClient()->get($this->url('metadata'))->send();
122:
123: $metadata = new Metadata();
124: $metadata->setArray(Formatter::decode($response));
125: $this->setMetadata($metadata);
126: }
127:
128: public function create($params = array())
129: {
130: return $this->getService()->createQueue($params);
131: }
132:
133: public function createJson()
134: {
135: return (object) array(
136: 'queue_name' => $this->getName(),
137: 'metadata' => $this->getMetadata(false)
138: );
139: }
140:
141: public function primaryKeyField()
142: {
143: return 'name';
144: }
145:
146: public function update($params = array())
147: {
148: return $this->noUpdate();
149: }
150:
151: /**
152: * This operation returns queue statistics including how many messages are
153: * in the queue, broken out by status.
154: *
155: * @return object
156: */
157: public function getStats()
158: {
159: $response = $this->getClient()
160: ->get($this->getUrl('stats'))
161: ->send();
162:
163: $body = Formatter::decode($response);
164:
165: return (!isset($body->messages)) ? false : $body->messages;
166: }
167:
168: /**
169: * Gets a message either by a specific ID, or, if no ID is specified, just
170: * an empty Message object.
171: *
172: * @param string|null $id If a string, then the service will retrieve an
173: * individual message based on its specific ID. If NULL, then an empty
174: * object is returned for further use.
175: * @return Message
176: */
177: public function getMessage($id = null)
178: {
179: return $this->getService()->resource('Message', $id, $this);
180: }
181:
182: /**
183: * Post an individual message.
184: *
185: * @param array $params
186: * @return bool
187: */
188: public function createMessage(array $params)
189: {
190: return $this->createMessages(array($params));
191: }
192:
193: /**
194: * Post multiple messages.
195: *
196: * @param array $messages
197: * @return bool
198: */
199: public function createMessages(array $messages)
200: {
201: $objects = array();
202:
203: foreach ($messages as $dataArray) {
204: $objects[] = $this->getMessage($dataArray)->createJson();
205: }
206:
207: $json = json_encode(array_slice($objects, 0, self::MAX_POST_MESSAGES));
208: $this->checkJsonError();
209:
210: $response = $this->getClient()
211: ->post($this->getUrl('messages'), self::getJsonHeader(), $json)
212: ->send();
213:
214: if (null !== ($location = $response->getHeader('Location'))) {
215: $parts = array_merge($this->getUrl()->getParts(), parse_url($location));
216: $url = Url::factory(Url::buildUrl($parts));
217:
218: $options = $this->makeResourceIteratorOptions(__NAMESPACE__ . '\\Message') + array(
219: 'baseUrl' => $url,
220: 'limit.page' => 10
221: );
222:
223: return PaginatedIterator::factory($this, $options);
224: }
225:
226: return true;
227: }
228:
229: /**
230: * Lists messages according to certain filter options. Results are ordered
231: * by age, oldest message first. All of the parameters are optional.
232: *
233: * @param array $options An associative array of filtering parameters:
234: *
235: * - ids (array) A two-dimensional array of IDs to retrieve.
236: *
237: * - claim_id (string) The claim ID to which the message is associated.
238: *
239: * - marker (string) An opaque string that the client can use to request the
240: * next batch of messages. If not specified, the API will return all
241: * messages at the head of the queue (up to limit).
242: *
243: * - limit (integer) A number up to 20 (the default, but is configurable)
244: * queues to return. If not specified, it defaults to 10.
245: *
246: * - echo (bool) Determines whether the API returns a client's own messages,
247: * as determined by the uuid portion of the User-Agent header. If not
248: * specified, echo defaults to FALSE.
249: *
250: * - include_claimed (bool) Determines whether the API returns claimed
251: * messages as well as unclaimed messages. If not specified, defaults
252: * to FALSE (i.e. only unclaimed messages are returned).
253: *
254: * @return \OpenCloud\Queues\Collection\MessageIterator
255: */
256: public function listMessages(array $options = array())
257: {
258: // Implode array into delimeted string if necessary
259: if (isset($options['ids']) && is_array($options['ids'])) {
260: $options['ids'] = implode(',', $options['ids']);
261: }
262:
263: // PHP will cast boolean values to integers (true => 1, false => 0) but
264: // the Queues REST API expects strings as query parameters ("true" and "false").
265: foreach ($options as $optionKey => $optionValue) {
266: if (true === $optionValue) {
267: $options[$optionKey] = "true";
268: } elseif (false === $optionValue) {
269: $options[$optionKey] = "false";
270: }
271: }
272:
273: $url = $this->getUrl('messages', $options);
274:
275: $options = $this->makeResourceIteratorOptions(__NAMESPACE__ . '\\Message') + array(
276: 'baseUrl' => $url,
277: 'limit.page' => 10
278: );
279:
280: return MessageIterator::factory($this, $options);
281: }
282:
283: /**
284: * This operation immediately deletes the specified messages, providing a
285: * means for bulk deletes.
286: *
287: * @param array $ids Two-dimensional array of IDs to be deleted
288: * @return boolean
289: */
290: public function deleteMessages(array $ids)
291: {
292: $url = $this->url('messages', array('ids' => implode(',', $ids)));
293: $this->getClient()->delete($url)->send();
294:
295: return true;
296: }
297:
298: /**
299: * This operation claims a set of messages, up to limit, from oldest to
300: * newest, skipping any that are already claimed. If no unclaimed messages
301: * are available, FALSE is returned.
302: *
303: * You should delete the message when you have finished processing it,
304: * before the claim expires, to ensure the message is processed only once.
305: * Be aware that you must call the delete() method on the Message object and
306: * pass in the Claim ID, rather than relying on the service's bulk delete
307: * deleteMessages() method. This is so that the server can return an error
308: * if the claim just expired, giving you a chance to roll back your processing
309: * of the given message, since another worker will likely claim the message
310: * and process it.
311: *
312: * Just as with a message's age, the age given for the claim is relative to
313: * the server's clock, and is useful for determining how quickly messages are
314: * getting processed, and whether a given message's claim is about to expire.
315: *
316: * When a claim expires, it is removed, allowing another client worker to
317: * claim the message in the case that the original worker fails to process it.
318: *
319: * @param int $limit
320: */
321: public function claimMessages(array $options = array())
322: {
323: $limit = (isset($options['limit'])) ? $options['limit'] : Claim::LIMIT_DEFAULT;
324: $grace = (isset($options['grace'])) ? $options['grace'] : Claim::GRACE_DEFAULT;
325: $ttl = (isset($options['ttl'])) ? $options['ttl'] : Claim::TTL_DEFAULT;
326:
327: $json = json_encode((object) array(
328: 'grace' => $grace,
329: 'ttl' => $ttl
330: ));
331:
332: $url = $this->getUrl('claims', array('limit' => $limit));
333:
334: $response = $this->getClient()->post($url, self::getJsonHeader(), $json)->send();
335:
336: if ($response->getStatusCode() == 204) {
337: return false;
338: }
339:
340: $options = array('resourceClass' => 'Message', 'baseUrl' => $url);
341:
342: return PaginatedIterator::factory($this, $options, Formatter::decode($response));
343: }
344:
345: /**
346: * If an ID is supplied, the API is queried for a persistent object; otherwise
347: * an empty object is returned.
348: *
349: * @param int $id
350: * @return Claim
351: */
352: public function getClaim($id = null)
353: {
354: return $this->getService()->resource('Claim', $id, $this);
355: }
356: }
357: