質問編集履歴

2

追記

2019/04/07 23:15

投稿

tomohiro.cloud
tomohiro.cloud

スコア17

test CHANGED
File without changes
test CHANGED
@@ -225,3 +225,287 @@
225
225
  }
226
226
 
227
227
  ```
228
+
229
+
230
+
231
+ Lambda関数
232
+
233
+ ```
234
+
235
+ /*
236
+
237
+ * Sample node.js code for AWS Lambda to get Apache log files from S3, parse
238
+
239
+ * and add them to an Amazon Elasticsearch Service domain.
240
+
241
+ *
242
+
243
+ *
244
+
245
+ * Copyright 2015- Amazon.com, Inc. or its affiliates. All Rights Reserved.
246
+
247
+ *
248
+
249
+ * Licensed under the Amazon Software License (the "License").
250
+
251
+ * You may not use this file except in compliance with the License.
252
+
253
+ * A copy of the License is located at http://aws.amazon.com/asl/
254
+
255
+ * or in the "license" file accompanying this file. This file is distributed
256
+
257
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
258
+
259
+ * express or implied. See the License for the specific language governing
260
+
261
+ * permissions and limitations under the License.
262
+
263
+ */
264
+
265
+
266
+
267
+ /* Imports */
268
+
269
+ var AWS = require('aws-sdk');
270
+
271
+ var LineStream = require('byline').LineStream;
272
+
273
+ var parse = require('elb-log-parser'); // Apache Common Log Format
274
+
275
+ var path = require('path');
276
+
277
+ var stream = require('stream');
278
+
279
+
280
+
281
+ /* Globals */
282
+
283
+ var esDomain = {
284
+
285
+ endpoint: 'https://search-test20190406-qludo2zvsek2bhjmfirgynljyi.us-east-1.es.amazonaws.com',
286
+
287
+ region: 'us-east-1',
288
+
289
+ index: 'logs',
290
+
291
+ doctype: 'elb'
292
+
293
+ };
294
+
295
+ var endpoint = new AWS.Endpoint(esDomain.endpoint);
296
+
297
+ var s3 = new AWS.S3();
298
+
299
+ var totLogLines = 0; // Total number of log lines in the file
300
+
301
+ var numDocsAdded = 0; // Number of log lines added to ES so far
302
+
303
+
304
+
305
+ /*
306
+
307
+ * The AWS credentials are picked up from the environment.
308
+
309
+ * They belong to the IAM role assigned to the Lambda function.
310
+
311
+ * Since the ES requests are signed using these credentials,
312
+
313
+ * make sure to apply a policy that permits ES domain operations
314
+
315
+ * to the role.
316
+
317
+ */
318
+
319
+ var creds = new AWS.EnvironmentCredentials('AWS');
320
+
321
+
322
+
323
+ /*
324
+
325
+ * Get the log file from the given S3 bucket and key. Parse it and add
326
+
327
+ * each log record to the ES domain.
328
+
329
+ */
330
+
331
+ function s3LogsToES(bucket, key, context, lineStream, recordStream) {
332
+
333
+ // Note: The Lambda function should be configured to filter for .log files
334
+
335
+ // (as part of the Event Source "suffix" setting).
336
+
337
+
338
+
339
+ var s3Stream = s3.getObject({Bucket: bucket, Key: key}).createReadStream();
340
+
341
+
342
+
343
+ // Flow: S3 file stream -> Log Line stream -> Log Record stream -> ES
344
+
345
+ s3Stream
346
+
347
+ .pipe(lineStream)
348
+
349
+ .pipe(recordStream)
350
+
351
+ .on('data', function(parsedEntry) {
352
+
353
+ postDocumentToES(parsedEntry, context);
354
+
355
+ });
356
+
357
+
358
+
359
+ s3Stream.on('error', function() {
360
+
361
+ console.log(
362
+
363
+ 'Error getting object "' + key + '" from bucket "' + bucket + '". ' +
364
+
365
+ 'Make sure they exist and your bucket is in the same region as this function.');
366
+
367
+ context.fail();
368
+
369
+ });
370
+
371
+ }
372
+
373
+
374
+
375
+ /*
376
+
377
+ * Add the given document to the ES domain.
378
+
379
+ * If all records are successfully added, indicate success to lambda
380
+
381
+ * (using the "context" parameter).
382
+
383
+ */
384
+
385
+ function postDocumentToES(doc, context) {
386
+
387
+ var req = new AWS.HttpRequest(endpoint);
388
+
389
+
390
+
391
+ req.method = 'POST';
392
+
393
+ req.path = path.join('/', esDomain.index, esDomain.doctype);
394
+
395
+ req.region = esDomain.region;
396
+
397
+ req.body = doc;
398
+
399
+ req.headers['presigned-expires'] = false;
400
+
401
+ req.headers['Host'] = endpoint.host;
402
+
403
+
404
+
405
+ // Sign the request (Sigv4)
406
+
407
+ var signer = new AWS.Signers.V4(req, 'es');
408
+
409
+ signer.addAuthorization(creds, new Date());
410
+
411
+
412
+
413
+ // Post document to ES
414
+
415
+ var send = new AWS.NodeHttpClient();
416
+
417
+ send.handleRequest(req, null, function(httpResp) {
418
+
419
+ var body = '';
420
+
421
+ httpResp.on('data', function (chunk) {
422
+
423
+ body += chunk;
424
+
425
+ });
426
+
427
+ httpResp.on('end', function (chunk) {
428
+
429
+ numDocsAdded ++;
430
+
431
+ if (numDocsAdded === totLogLines) {
432
+
433
+ // Mark lambda success. If not done so, it will be retried.
434
+
435
+ console.log('All ' + numDocsAdded + ' log records added to ES.');
436
+
437
+ context.succeed();
438
+
439
+ }
440
+
441
+ });
442
+
443
+ }, function(err) {
444
+
445
+ console.log('Error: ' + err);
446
+
447
+ console.log(numDocsAdded + 'of ' + totLogLines + ' log records added to ES.');
448
+
449
+ context.fail();
450
+
451
+ });
452
+
453
+ }
454
+
455
+
456
+
457
+ /* Lambda "main": Execution starts here */
458
+
459
+ exports.handler = function(event, context) {
460
+
461
+ console.log('Received event: ', JSON.stringify(event, null, 2));
462
+
463
+
464
+
465
+ /* == Streams ==
466
+
467
+ * To avoid loading an entire (typically large) log file into memory,
468
+
469
+ * this is implemented as a pipeline of filters, streaming log data
470
+
471
+ * from S3 to ES.
472
+
473
+ * Flow: S3 file stream -> Log Line stream -> Log Record stream -> ES
474
+
475
+ */
476
+
477
+ var lineStream = new LineStream();
478
+
479
+ // A stream of log records, from parsing each log line
480
+
481
+ var recordStream = new stream.Transform({objectMode: true})
482
+
483
+ recordStream._transform = function(line, encoding, done) {
484
+
485
+ var logRecord = parse(line.toString());
486
+
487
+ var serializedRecord = JSON.stringify(logRecord);
488
+
489
+ this.push(serializedRecord);
490
+
491
+ totLogLines ++;
492
+
493
+ done();
494
+
495
+ }
496
+
497
+
498
+
499
+ event.Records.forEach(function(record) {
500
+
501
+ var bucket = record.s3.bucket.name;
502
+
503
+ var objKey = decodeURIComponent(record.s3.object.key.replace(/+/g, ' '));
504
+
505
+ s3LogsToES(bucket, objKey, context, lineStream, recordStream);
506
+
507
+ });
508
+
509
+ }
510
+
511
+ ```

1

情報の追記

2019/04/07 23:15

投稿

tomohiro.cloud
tomohiro.cloud

スコア17

test CHANGED
File without changes
test CHANGED
@@ -16,7 +16,7 @@
16
16
 
17
17
 
18
18
 
19
-
19
+ LambdaのテストはELBのアクセスログから作りました。
20
20
 
21
21
  実行結果:失敗ログ
22
22