sbt new spotify/scio.g8
target/pack/bin/word-count --output=wc
cat wc/part-00000-of-00001.txt
import com.spotify.scio._ /* sbt "runMain [PACKAGE].WordCount --project=[PROJECT] --runner=DataflowRunner --zone=[ZONE] --input=gs://dataflow-samples/shakespeare/kinglear.txt --output=gs://[BUCKET]/[PATH]/wordcount" */
object WordCount { def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs)
val exampleData = "gs://dataflow-samples/shakespeare/kinglear.txt" val input = args.getOrElse("input", exampleData) val output = args("output")
sc.textFile(input) .map(_.trim) .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .countByValue .map(t => t._1 + ": " + t._2) .saveAsTextFile(output)
val result = sc.close().waitUntilFinish() } }
| https://spotify.github.io/scio/Scio,-Beam-and-Dataflow.html Scio to Apache Beam
Check out the Beam Programming Guide first for a detailed explanation of the Beam programming model and concepts. Also see this comparison between Scio, Scalding and Spark APIs.
Scio aims to be a thin wrapper on top of Beam while offering idiomatic Scala style API.
Basics¶
ScioContext wraps Pipeline
SCollection wraps PCollection
ScioResult wraps PipelineResult
- Most
PTransform are implemented as idiomatic Scala methods on SCollection e.g. map , flatMap , filter , reduce .
PairSCollectionFunctions and DoubleSCollectionFunctions are specialized version of SCollection implemented via the Scala “pimp my library” pattern.
- An
SCollection[(K, V)] is automatically converted to a PairSCollectionFunctions which provides key-value operations, e.g. groupByKey , reduceByKey , cogroup , join .
- An
SCollection[Double] is automatically converted to a DoubleSCollectionFunctions which provides statistical operations, e.g. stddev , variance .
ScioContext, PipelineOptions and ScioResult¶
- Beam uses
PipelineOptions and its subclasses to parse command line arguments. Users have to extend the interface for their application level arguments.
ScioContext has a parseArguments method that takes an Array[String] of command line arguments, parses Beam specific ones into a PipelineOptions , and application specific ones into an Args , and returns the (PipelineOptions, Args) .
ContextAndArgs is a short cut to create a (ScioContext, Args) .
ScioResult can be used to access accumulator values and job state.
- Most
IO Read transforms are implemented as methods on ScioContext , e.g. avroFile , textFile , bigQueryTable .
- Most
IO Write transforms are implemented as methods on SCollection , e.g. saveAsAvroFile , saveAsTextFile , saveAsBigQueryTable .
- These IO operations also detects when the
ScioContext is running in a JobTest and manages test IO in memory.
- Write options also return a
ClosedTap . Once the job completes you can open the Tap . Tap abstracts away the logic of reading the dataset directly as an Iterator[T] or re-opening it in another ScioContext . The Future is complete once the job finishes. This can be used to do light weight pipeline orchestration e.g. WordCountOrchestration.scala.
ByKey operations¶
- Beam
ByKey transforms require PCollection[KV[K, V]] inputs while Scio uses SCollection[(K, V)]
- Hence every
ByKey transform in PairSCollectionFunctions converts Scala (K, V) to KV[K, V] before and vice versa afterwards. However these are lightweight wrappers and the JVM should be able to optimize them.
PairSCollectionFunctions also converts java.lang.Iterable[V] and java.util.List[V] to scala.Iterable[V] in some cases.
Coders¶
- Beam/Dataflow uses
Coder for (de)serializing elements in a PCollection
during shuffle. There are built-in coders for Java primitive types,
collections, and common types in GCP like Avro, ProtoBuf, BigQuery TableRow , Datastore Entity .
PCollection uses TypeToken from Guava reflection and to workaround Java type erasure and retrieve type information of elements. This may not always work but there is a PCollection#setCoder method to override.
- Twitter’s chill library uses kryo
to (de)serialize data. Chill includes serializers for common Scala
types and cal also automatically derive serializers for arbitrary
objects. Scio falls back to
KryoAtomicCoder when a built-in one isn’t available.
- A coder may be non-deterministic if
Coder#verifyDeterministic throws an exception. Any data type with such a coder cannot be used as a key in ByKey operations. However KryoAtomicCoder
assumes all types are deterministic for simplicity so it’s up to the
user’s discretion to not avoid non-deterministic types e.g. tuples or
case classes with doubles as keys.
- Avro
GenericRecord requires a schema during deserialization (which is available as GenericRecord#getSchema for serialization) and AvroCoder requires that too during initialization. This is not possible in KryoAtomicCoder , i.e. when nesting GenericRecord inside a Scala type. Instead KryoAtomicCoder
serializes the schema before every record so that they can roundtrip
safely. This is not optimal but the only way without requiring user to
handcraft a custom coder. |
|
|