Scala study note

          var test : String = _
scala> List(41, "cat") map { case i: Int ⇒ i + 1 }

Note that the “case statement” inside the curly bracket forms an Anonymous function, which take one parameter (ing) i and produce “i+1”. The case statement forms pattern matching partial function, so that “cat” should not be handled. Unfortunately, scala has complicated syntax and bad documentation. You have to dig into language specification to figure out this use case

https://www.scala-lang.org/files/archive/spec/2.11/08-pattern-matching.html

And see 2 for a better explanation for scala partial function.

4. Combining 2 with 3, we will have

scala> List(41, "cat") map { case i: Int ⇒ i + 1 } scala.MatchError: cat (of class java.lang.String)

and

scala> List(41, "cat") collect { case i: Int ⇒ i + 1 } res1: List[Int] = List(42)

This is because “collect” expect an partial function, and scala compiler is smart enough to infer the “case” expression into a partial function with correct “isDefinedAt” function (see 2 for partial function definition)

5. Scala implicit function, implicit parameter and implicit class

def convertToJson[T:Jsonable](x: T): Json = {
    implicitly[Jsonable[T]].serialize(x)
}
def convertToJson[T](x : T)(implicit js: Jsonable[T]) : Json = {
    sh.serialize(x)
}

 Essentially, [T : Jsonable] means an implicit parameter “js : Jsonable[T]) is passed into the function. “implicitly[Jsonable[T]]” essentially refer to that implicit variable

6. Scala product type:  You can think of a product class as a collection of object whose type are restricted to N predefined type, .e.g, Product4 means its object can be one of 4 predefined types. Therefore, Option is derived from product, since Options may the underneath data could be either null or some concrete object. All case classes are also Product.

Product sub-class are:

  • All case classes
  • List
  • Option

7. Scala Seq vs List – https://stackoverflow.com/questions/10866639/difference-between-a-seq-and-a-list-in-scala

  • Scala Seq is trait, equivalent to List interface in java
  • Scala’s List is an abstract class that is extended by Nil and ::, which are the concrete implementations of List

8. Case class and companion object – http://fruzenshtein.com/scala-case-class-companion-object/

  • case class is the data holder
  • case class companion object can be thought of as the “service” class for all case class, i.e., it is a singleton object hold all supporting methods taking a case class as a input parameter. Think those method as class’s static method in java. In java, static method of a class belongs to “class”, instead of a particular variable. In Case class companion object, it essentially belong to a global singleton object.
Advertisements
Scala study note

Spark Study Notes

Spark sql catalyst optimizer – how to turn a query into a execution plan

https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

4 Stages from AST input:

  • Analysis – figure out the type for each attribute (1000 lines of code)
  • Local optimization – apply optimization rule to logic plan (the tree) (800 lines of code)
  • Physical planning – generate multiple physical plan based on logic plan. Select the best physical plan based on a cost model (500 lines of code)
  • Code generation

 

Spark RDD transformations:

  • intersection(otherDataset)
    • Dataset A = (1, 2, 3), Daset B = (3, 4, 5),
    • A.intersection(B) = (3)
  • join(otherDataset, [numTasks])  – similar to inner join
    • Dataset A = ((1, a), (2, b)), Daset B = ((1, x),  (3, y)),
    • A.join(B) = ( (1,(a,x)) )
  • cogroup(otherDataset, [numTasks]) – similar to full out join, except that the result for the same key are organized as iteratable collection
    • Dataset A = ((1, a), (1, b),  (2, c)), Daset B = ((1, x),  (3, y)),
    • A.cogroup(B) = ( (1, [a, b], [x])), (2, [c], [])), (3, [], [y]) )
  • cartesian(otherDataset)
    • Dataset A = (1, 2), Daset B = (x, y),
    • A.cartesian(B) = ((1, x), (1, y), (2, x), (2, y))
  • coalesce(numPartitions) – repartition downward while avoiding full shuffle
    • Dataset A:
      • partition 1 = (1, 2, 3)
      • partition 2 = (4, 5, 6)
      • partition 3 = (7, 8, 9)
      • partition 4 = (10, 11, 12)
    • A.coalesce(2)
      • partition 1 = (1, 2, 3, 7, 8, 9)
      • partition 2 = (4, 5, 6, 10, 11, 12)
  • repartition(numPartitions) – full shuffle repartition

 

Spark Study Notes

git tip

  1. Remove local changes.  There is 3 types of local change file
    • Type 1: Staged tracked files
    • Type 2: Unstaged tracked files
    • Type 3: Unstaged untraced files.
    • git checkout .    : Remove type 2 file
    • git clean -f         : Remove type 3 file
    • git reset –hard : remove type 1 and type 2
    • git stash -u        : remove  type 1, 2, and 3
    • git reset –soft   : change type 1 to change 2
  2. Undo pushed change
    • git revert HEAD, or git revert {SHA}.( this will create a new commit which you can push and undo the previous change),
  3. Cherry pick a change from one git to another for merge
    • git fetch $git-1-url $branch && git cherry-pick $SHA  (then resolve merge conflicts, git add ,  e.g.,
    • git fetch git@github.zhukeyao/spark.git master && git cherry-pick {SHA} Resolve merge conflicts, git add
  4. create a patch and applied to another branch
    • git diff {SHA1} {SHA2} > patch.diff
    • git checkout target-branch
    • git apply patch.diff
  5. Add all modified file
    • git ls-files –modified  | xargs git add

git tip

Async programming mode in Hack(php)

Hack(Php) supports a very interesting programming mode called Async. The idea is to run IO related function asynchronously, in the single same main php thread. It is called cooperative multi-tasking. The sample code is as follows:

<?hh

namespace Hack\UserDocumentation\Async\Intro\Examples\Curl;

async function curl_A(): Awaitable<string> {
  $x = await \HH\Asio\curl_exec("http://example.com/");
  return $x;
}

async function curl_B(): Awaitable<string> {
  $y = await \HH\Asio\curl_exec("http://example.net/");
  return $y;
}

async function async_curl(): Awaitable<void> {
  $start = microtime(true);
  list($a, $b) = await \HH\Asio\v(array(curl_A(), curl_B()));
  $end = microtime(true);
  echo "Total time taken: " . strval($end - $start) . " seconds" . PHP_EOL;
}

\HH\Asio\join(async_curl());

In this example, we defined two async functions, each of which perform a time-consuming IO operation (curl). Through “await \HH\Asio\v(array(curl_A(), curl_B()))”, we basically be able to call the two async function simultaneously, i.e., parallelized two io operations. Note in each function, we directly return $x (or $y), whose type is “string”, even through the return type is specified as “Awaitable<string>”, as Awaitable<T> is something generated by HHVM automatically.

Here is my understanding on how Async is realized in Hack

  1. When a function is defined as “async”, it always return a “Waitable<T>” object. Inside the function, the programmer still return whatever type “T” result it want to return. This Waitable<T> should be automatically added by HHVM or HACK compiler.
  2. Without await function, The execution flow will not be stalled, the program will run synchronously until the async function returned, and the return value is a “Waitable”. If there is a real sync operation inside the called async function. A task will be saved in HHVM’s internal state, the task will be linked with the returned “Waitable<T>”.
  3. When “$x = await Waitable<T>” is called, the thread stop execution for current logic branch, until the underneath sync operation finished and Waitable<T>’s true value can be yielded.
  4. HHVM ueses \HH\Asio\v (for vector) and \HH\Asio\m (for map) to support schedule multiple async function call (i.e., multiple Waitables) logic branch all together, and be able to switch between when one got blocked. This is the key entry point for so called cooperative multi-tasking.  And one that certainly recursively constructive multiple level of sync calls under each execution logic branch. 
  5. There need to be some special handle for the final real “sync” operation, e.g., \HH\Asio\curl_exec as we shown in the example. Normal sync calls (mysql operation, sleep, curl) will not works and will block the entire thread and in-valid Sync purpose. I guess a special version of each regular sync call should be implemented within HHVM, which may be called “Extension” in HHVM document.
  6. There is a different between join and await. await only wait for current Awaitable object to be materialized, and allows HHVM to switch execution branch for other task, while “join” does not allows the switch. Therefore, “join” is usually used in top level un-async function, e.g.,  main().

Async is a very interesting mode I only see in Hack. It solves the problem how to parallelized time-consuming IO operations in an elegant way, at the programing language level. Usually in Java, we have to rely on user-space multi-threading to solve this kind of problem.



			
Async programming mode in Hack(php)

Java Thread Pool (ExecutorService)

Java concurrent package ExecutorService interface provides thread pool support. Different types of thread pools (ExecutorService) can be created using Executors class’s following static functions

  • newCachedThreadPool()
    • Create a new thread for a new submitted task (callable or future). Idle thread will be terminated after 60 seconds.
  • newFixedThreadPool(int nThreads)
    • Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue
  • newSingleThreadExecutor()
    • Creates an Executor that uses a single worker thread operating off an unbounded queue
  • newScheduledThreadPool(int corePoolSize)
    • Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically
  • newWorkStealPool(int parellelism)
    • Creates a thread pool that maintains enough threads to support the given parallelism level, and may use multiple queues to reduce contention.
    • Note that, this means each thread may have its own task queue, and other thread can “steal” task from neighbor thread if needed.
    • This function actual create ForkJoinPool, i.e.,
public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
                            ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                            null, true);
}
Java Thread Pool (ExecutorService)