Apache Spark та Amazon S3 - Gotchas та кращі практики

S3 є сховищем об'єктів, а не файловою системою, отже, проблеми, що виникають у результаті можливої ​​послідовності, неатомні перейменування повинні вирішуватися в коді програми. Сервер каталогів у файловій системі був замінений хеш-алгоритмом імені файлу. Це погано для перерахування речей, операцій з каталогами, видалення та перейменування (копіювання та видалення, оскільки технічно немає перейменування в магазинах об'єктів)

Почніть використовувати S3A (схема URI: s3a: //) - Hadoop 2.7+. S3a є рекомендованим клієнтом S3 для Hadoop 2.7 і пізніших версій, S3a є більш ефективним і підтримує більші файли (до 5 ТБ) та підтримує завантаження з декількох частин. Усі об’єкти, доступні з s3n: // URL-адреси, також повинні бути доступні з s3a, просто замінивши схему URL-адрес. Більшість звітів про помилки щодо S3N закриються як WONTFIX

Зробити Spark 2.0.1 працювати з S3a Для Spark 2.0.1 використовуйте hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, joda-time-2.9.3.jar у своєму класі; не забудьте оновити spark-default.conf за допомогою клавіш AWS та S3A FileSystemClass

Spark.hadoop.fs.s3a.access.key XXXXXXX
spark.hadoop.fs.s3a.secret.key XXXXXXX
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

Однозначно використовуйте Dataframes для переупорядкування запитів, а вимкнення натискань доступно поза коробкою, і, отже, отримується менше даних, з часом прискорюючи ваші запити

Якщо ви читаєте одні й ті самі дані кілька разів, спробуйте скористатися .cache або s3distcp для передачі файлів у ваш локальний кластер EMR, щоб отримати кращу ефективність читання файлів у реальній файловій системі. Варіант groupBy s3distcp - це чудовий варіант вирішити проблему невеликих файлів шляхом об’єднання великої кількості невеликих файлів.

Що підводить мене до питання читання великої кількості невеликих файлів. Якщо об'єднання файлів за допомогою інструменту не є можливим, спробуйте наступний код, який ефективно працює навколо повільного вузького списку каталогів S3

імпортувати com.amazonaws.services.s3._, model._
    імпортувати com.amazonaws.auth.BasicAWSCреєстрації

    val запит = новий ListObjectsRequest ()
    request.setBucketName (відро)
    request.setPrefix (префікс)
    request.setMaxKeys (pageLength)
    def s3 = новий AmazonS3Client (нові BasicAWSCдослідження (ключ, секрет))

    val objs = s3.listObjects (request) // Зауважте, що цей метод повертає усічені дані, якщо довше, ніж "pageLength" вище. Можливо, вам доведеться розібратися з цим.
    sc.parallelize (objs.getObjectSummaries.map (_. getKey) .toList)
        .flatMap {key => Source.fromInputStream (s3.getObject (відро, ключ) .getObjectContent: InputStream) .getLines}

Переконайтесь, що параметр spark.sql.parquet.filterPushdown є істинним, а spark.sql.parquet.mergeSchema помилковим (щоб уникнути злиття схеми під час запису, що дійсно сповільнює стадію запису). На щастя, Spark 2.0 має правильний стандарт

Ви замислювались, чому саме в той час, коли робота збирається завершити, нічого не записується в журнали, і всі іскрові операції, схоже, припинилися, але результати ще не є у вихідному каталозі S3 ... що відбувається? Щоразу, коли виконавці записують результат завдання, кожен з них записує у тимчасовий каталог за межами головного каталогу, куди повинні були бути записані файли, і як тільки всі виконавці будуть зроблені, перейменування робиться для отримання атомної ексклюзивності. Це все добре в стандартній файловій системі, наприклад hdfs, де перейменування миттєві, але в сховищі об'єктів, як S3, це не сприятливо, оскільки перейменування на S3 виконуються зі швидкістю 6 Мб / с.

Якщо можливо, напишіть вихідні завдання в hdfs EMR (щоб скористатися майже миттєвими перейменами та кращим IO файлів локальних hdfs) та додайте крок dstcp для переміщення файлів у S3, щоб заощадити всі проблеми з обробкою внутрішніх магазин об'єктів, що намагається бути файловою системою. Крім того, запис на локальний hdfs дозволить вам дозволити спекуляціям керувати утікаючими завданнями без попадання в тупикові пастки, пов'язані з DirectOutputCommiter.

Якщо ви повинні використовувати S3 в якості вихідного каталогу, переконайтеся, що встановлені наступні конфігурації Spark

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation false

Примітка: DirectParquetOutputCommitter видаляється з Spark 2.0 через ймовірність втрати даних. На жаль, поки ми не покращили узгодженість з S3a, нам доведеться працювати з обхідними шляхами. З Hadoop 2.8 все покращується

Уникайте ключових слів у лексикографічному порядку. Можна обійтися хеш-/ випадковими префіксами або зворотним часом-датою, щоб обійти їх. Трюк полягає в тому, щоб назвати свої ключі ієрархічно, поставивши найпоширеніші речі, за якими ви фільтруєте, ліворуч від вашої клавіші. І ніколи не слід підкреслювати назви відра через проблеми з DNS.

Увімкнення fs.s3a.fast.upload паралельно завантажувати частини одного файлу в Amazon S3

Добре, що це був мозковий відвал проблем у виробництві, які я нещодавно вирішував, щоб Spark працював із S3. Слідкуйте за додатковими відомостями щодо цього, коли я копаю глибше в наступній публікації ...