Spark Study Notes

Spark sql catalyst optimizer – how to turn a query into a execution plan

https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

4 Stages from AST input:

Analysis – figure out the type for each attribute (1000 lines of code)

Local optimization – apply optimization rule to logic plan (the tree) (800 lines of code)

Physical planning – generate multiple physical plan based on logic plan. Select the best physical plan based on a cost model (500 lines of code)

Code generatation

 

Advertisements
Spark Study Notes

git tip

  1. Undo committed (not pushed) change, but still keep keeping the change locally
    • git reset –soft HEAD~1
  2. Undo committed (not pushed) change, discarding the change totally.
    • git reset –hard HEAD~1
  3. Undo pushed change
    • git revert HEAD, or git revert {SHA}.( this will create a new commit which you can push and undo the previous change),
  4. Cherry pick a change from one git to another for merge
    • git fetch $git-1-url $branch && git cherry-pick $SHA  (then resolve merge conflicts, git add ,  e.g.,
    • git fetch git@github.zhukeyao/spark.git master && git cherry-pick {SHA} Resolve merge conflicts, git add
  5. create a patch and applied to another branch
    • git diff {SHA1} {SHA2} > patch.diff
    • git checkout target-branch
    • git apply patch.diff
git tip

Async programming mode in Hack(php)

Hack(Php) supports a very interesting programming mode called Async. The idea is to run IO related function asynchronously, in the single same main php thread. It is called cooperative multi-tasking. The sample code is as follows:

<?hh

namespace Hack\UserDocumentation\Async\Intro\Examples\Curl;

async function curl_A(): Awaitable<string> {
  $x = await \HH\Asio\curl_exec("http://example.com/");
  return $x;
}

async function curl_B(): Awaitable<string> {
  $y = await \HH\Asio\curl_exec("http://example.net/");
  return $y;
}

async function async_curl(): Awaitable<void> {
  $start = microtime(true);
  list($a, $b) = await \HH\Asio\v(array(curl_A(), curl_B()));
  $end = microtime(true);
  echo "Total time taken: " . strval($end - $start) . " seconds" . PHP_EOL;
}

\HH\Asio\join(async_curl());

In this example, we defined two async functions, each of which perform a time-consuming IO operation (curl). Through “await \HH\Asio\v(array(curl_A(), curl_B()))”, we basically be able to call the two async function simultaneously, i.e., parallelized two io operations. Note in each function, we directly return $x (or $y), whose type is “string”, even through the return type is specified as “Awaitable<string>”, as Awaitable<T> is something generated by HHVM automatically.

Here is my understanding on how Async is realized in Hack

  1. When a function is defined as “async”, it always return a “Waitable<T>” object. Inside the function, the programmer still return whatever type “T” result it want to return. This Waitable<T> should be automatically added by HHVM or HACK compiler.
  2. Without await function, The execution flow will not be stalled, the program will run synchronously until the async function returned, and the return value is a “Waitable”. If there is a real sync operation inside the called async function. A task will be saved in HHVM’s internal state, the task will be linked with the returned “Waitable<T>”.
  3. When “$x = await Waitable<T>” is called, the thread stop execution for current logic branch, until the underneath sync operation finished and Waitable<T>’s true value can be yielded.
  4. HHVM ueses \HH\Asio\v (for vector) and \HH\Asio\m (for map) to support schedule multiple async function call (i.e., multiple Waitables) logic branch all together, and be able to switch between when one got blocked. This is the key entry point for so called cooperative multi-tasking.  And one that certainly recursively constructive multiple level of sync calls under each execution logic branch. 
  5. There need to be some special handle for the final real “sync” operation, e.g., \HH\Asio\curl_exec as we shown in the example. Normal sync calls (mysql operation, sleep, curl) will not works and will block the entire thread and in-valid Sync purpose. I guess a special version of each regular sync call should be implemented within HHVM, which may be called “Extension” in HHVM document.
  6. There is a different between join and await. await only wait for current Awaitable object to be materialized, and allows HHVM to switch execution branch for other task, while “join” does not allows the switch. Therefore, “join” is usually used in top level un-async function, e.g.,  main().

Async is a very interesting mode I only see in Hack. It solves the problem how to parallelized time-consuming IO operations in an elegant way, at the programing language level. Usually in Java, we have to rely on user-space multi-threading to solve this kind of problem.



			
Async programming mode in Hack(php)

Java Thread Pool (ExecutorService)

Java concurrent package ExecutorService interface provides thread pool support. Different types of thread pools (ExecutorService) can be created using Executors class’s following static functions

  • newCachedThreadPool()
    • Create a new thread for a new submitted task (callable or future). Idle thread will be terminated after 60 seconds.
  • newFixedThreadPool(int nThreads)
    • Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue
  • newSingleThreadExecutor()
    • Creates an Executor that uses a single worker thread operating off an unbounded queue
  • newScheduledThreadPool(int corePoolSize)
    • Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically
  • newWorkStealPool(int parellelism)
    • Creates a thread pool that maintains enough threads to support the given parallelism level, and may use multiple queues to reduce contention.
    • Note that, this means each thread may have its own task queue, and other thread can “steal” task from neighbor thread if needed.
    • This function actual create ForkJoinPool, i.e.,
public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
                            ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                            null, true);
}
Java Thread Pool (ExecutorService)